diff --git a/Cargo.lock b/Cargo.lock index 31ac9d42ad..0ce2a60029 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5251,6 +5251,26 @@ dependencies = [ "zeroize", ] +[[package]] +name = "tari_chat_client" +version = "0.48.0-pre.1" +dependencies = [ + "anyhow", + "diesel", + "lmdb-zero", + "tari_common", + "tari_common_sqlite", + "tari_common_types", + "tari_comms", + "tari_comms_dht", + "tari_contacts", + "tari_p2p", + "tari_service_framework", + "tari_shutdown", + "tari_storage", + "tari_test_utils", +] + [[package]] name = "tari_common" version = "0.50.0-pre.0" @@ -5475,8 +5495,10 @@ dependencies = [ "diesel", "diesel_migrations", "futures 0.3.26", - "libsqlite3-sys", "log", + "num-derive", + "num-traits", + "prost 0.9.0", "rand 0.7.3", "tari_common", "tari_common_sqlite", @@ -5493,6 +5515,7 @@ dependencies = [ "thiserror", "tokio", "tower", + "uuid", ] [[package]] @@ -5616,11 +5639,14 @@ dependencies = [ "tari_app_utilities", "tari_base_node", "tari_base_node_grpc_client", + "tari_chat_client", "tari_common", + "tari_common_sqlite", "tari_common_types", "tari_comms", "tari_comms_dht", "tari_console_wallet", + "tari_contacts", "tari_core", "tari_crypto", "tari_merge_mining_proxy", @@ -6763,6 +6789,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936e4b492acfd135421d8dca4b1aa80a7bfc26e702ef3af710e0752684df5372" +[[package]] +name = "uuid" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" +dependencies = [ + "getrandom 0.2.8", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/applications/tari_console_wallet/log4rs_sample.yml b/applications/tari_console_wallet/log4rs_sample.yml index ed01144f82..34fab5373e 100644 --- a/applications/tari_console_wallet/log4rs_sample.yml +++ b/applications/tari_console_wallet/log4rs_sample.yml @@ -80,6 +80,23 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # An appender named "contacts" that writes to a file with a custom pattern encoder + contacts: + kind: rolling_file + path: "{{log_dir}}/log/wallet/contacts.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "{{log_dir}}/log/wallet/contacts.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # root (to base_layer) root: level: debug @@ -116,6 +133,11 @@ loggers: appenders: - network additive: false + contacts: + level: info + appenders: + - contacts + additive: false comms::noise: level: error appenders: diff --git a/applications/tari_console_wallet/src/ui/state/app_state.rs b/applications/tari_console_wallet/src/ui/state/app_state.rs index 8c102b20c1..8ff31563ad 100644 --- a/applications/tari_console_wallet/src/ui/state/app_state.rs +++ b/applications/tari_console_wallet/src/ui/state/app_state.rs @@ -42,7 +42,7 @@ use tari_comms::{ net_address::{MultiaddressesWithStats, PeerAddressSource}, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, }; -use tari_contacts::contacts_service::{handle::ContactsLivenessEvent, storage::database::Contact}; +use tari_contacts::contacts_service::{handle::ContactsLivenessEvent, types::Contact}; use tari_core::transactions::{ tari_amount::{uT, MicroTari}, transaction_components::OutputFeatures, diff --git a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs index 42938e7ba6..32ea9c33f4 100644 --- a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -266,7 +266,7 @@ impl WalletEventMonitor { ); self.trigger_contacts_refresh().await; } - ContactsLivenessEvent::NetworkSilence => {} + ContactsLivenessEvent::NetworkSilence => {}, } } Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/applications/tari_console_wallet/src/ui/ui_contact.rs b/applications/tari_console_wallet/src/ui/ui_contact.rs index 1426aa975a..d10266d459 100644 --- a/applications/tari_console_wallet/src/ui/ui_contact.rs +++ b/applications/tari_console_wallet/src/ui/ui_contact.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use chrono::{DateTime, Local}; -use tari_contacts::contacts_service::storage::database::Contact; +use tari_contacts::contacts_service::types::Contact; #[derive(Debug, Clone)] pub struct UiContact { diff --git a/base_layer/contacts/Cargo.toml b/base_layer/contacts/Cargo.toml index 488d445062..cf64de0250 100644 --- a/base_layer/contacts/Cargo.toml +++ b/base_layer/contacts/Cargo.toml @@ -8,34 +8,37 @@ edition = "2018" [dependencies] tari_common = { path = "../../common" } +tari_common_sqlite = { path = "../../common_sqlite" } tari_common_types = { path = "../../base_layer/common_types" } tari_comms = { path = "../../comms/core" } +tari_comms_dht = { path = "../../comms/dht" } tari_crypto = { version = "0.16.11"} tari_p2p = { path = "../p2p", features = ["auto-update"] } tari_service_framework = { path = "../service_framework" } tari_shutdown = { path = "../../infrastructure/shutdown" } -tari_common_sqlite = { path = "../../common_sqlite" } tari_utilities = "0.4.10" -tokio = { version = "1.23", features = ["sync", "macros"] } chrono = { version = "0.4.19", default-features = false, features = ["serde"] } diesel = { version = "2.0.3", features = ["sqlite", "serde_json", "chrono", "64-column-tables"] } diesel_migrations = "2.0.0" futures = { version = "^0.3.1", features = ["compat", "std"] } -libsqlite3-sys = { version = "0.25.1", features = ["bundled"], optional = true } log = "0.4.6" +num-derive = "0.3.3" +num-traits = "0.2.15" +prost = "0.9" rand = "0.7.3" thiserror = "1.0.26" +tokio = { version = "1.23", features = ["sync", "macros"] } tower = "0.4" +uuid = { version = "1.3.1", features = ["v4"] } [dev-dependencies] -tari_test_utils = { path = "../../infrastructure/test_utils" } tari_comms_dht = { path = "../../comms/dht", features = ["test-mocks"] } +tari_test_utils = { path = "../../infrastructure/test_utils" } tempfile = "3.1.0" -[features] -default=["bundled_sqlite"] -bundled_sqlite = ["libsqlite3-sys"] +[build-dependencies] +tari_common = { path = "../../common" } [package.metadata.cargo-machete] -ignored = ["libsqlite3-sys"] # this is so we can run cargo machete without getting false positive about macro dependancies +ignored = ["prost"] # this is so we can run cargo machete without getting false positive about macro dependancies diff --git a/base_layer/contacts/build.rs b/base_layer/contacts/build.rs new file mode 100644 index 0000000000..f1858cc133 --- /dev/null +++ b/base_layer/contacts/build.rs @@ -0,0 +1,31 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +fn main() -> Result<(), Box> { + tari_common::build::ProtobufCompiler::new() + .proto_paths(&["proto"]) + .include_paths(&["proto"]) + .emit_rerun_if_changed_directives() + .compile() + .unwrap(); + Ok(()) +} diff --git a/base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql new file mode 100644 index 0000000000..358d2f591e --- /dev/null +++ b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql @@ -0,0 +1,3 @@ +DROP INDEX idx_messages_address; + +DROP TABLE IF EXISTS messages; diff --git a/base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql new file mode 100644 index 0000000000..1d8b4f0020 --- /dev/null +++ b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql @@ -0,0 +1,9 @@ +CREATE TABLE messages ( + address BLOB NOT NULL, + message_id BLOB PRIMARY KEY NOT NULL, + body BLOB NOT NULL, + stored_at TIMESTAMP NOT NULL, + direction INTEGER NOT NULL +); + +CREATE INDEX idx_messages_address ON messages (address); diff --git a/base_layer/contacts/proto/message.proto b/base_layer/contacts/proto/message.proto new file mode 100644 index 0000000000..e528929527 --- /dev/null +++ b/base_layer/contacts/proto/message.proto @@ -0,0 +1,18 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +syntax = "proto3"; +package tari.contacts.chat; + +message Message { + bytes body = 1; + bytes address = 2; + DirectionEnum direction = 3; + uint64 stored_at = 4; + bytes message_id = 5; +} + +enum DirectionEnum { + Inbound = 0; + Outbound = 1; +} \ No newline at end of file diff --git a/base_layer/contacts/src/contacts_service/error.rs b/base_layer/contacts/src/contacts_service/error.rs index 634a100822..8bc6794e5f 100644 --- a/base_layer/contacts/src/contacts_service/error.rs +++ b/base_layer/contacts/src/contacts_service/error.rs @@ -23,6 +23,7 @@ use diesel::result::Error as DieselError; use tari_common_sqlite::error::SqliteStorageError; use tari_comms::connectivity::ConnectivityError; +use tari_comms_dht::outbound::DhtOutboundError; use tari_p2p::services::liveness::error::LivenessError; use tari_service_framework::reply_channel::TransportChannelError; use thiserror::Error; @@ -44,6 +45,8 @@ pub enum ContactsServiceError { LivenessError(#[from] LivenessError), #[error("ConnectivityError error: `{0}`")] ConnectivityError(#[from] ConnectivityError), + #[error("Outbound comms error: `{0}`")] + OutboundCommsError(#[from] DhtOutboundError), } #[derive(Debug, Error)] diff --git a/base_layer/contacts/src/contacts_service/handle.rs b/base_layer/contacts/src/contacts_service/handle.rs index 165a112d34..7008b31c79 100644 --- a/base_layer/contacts/src/contacts_service/handle.rs +++ b/base_layer/contacts/src/contacts_service/handle.rs @@ -35,7 +35,7 @@ use tower::Service; use crate::contacts_service::{ error::ContactsServiceError, service::{ContactMessageType, ContactOnlineStatus}, - storage::database::Contact, + types::{Contact, Message}, }; #[derive(Debug, Clone, PartialEq, Eq)] @@ -130,6 +130,8 @@ pub enum ContactsServiceRequest { RemoveContact(TariAddress), GetContacts, GetContactOnlineStatus(Contact), + SendMessage(TariAddress, Message), + GetAllMessages(TariAddress), } #[derive(Debug)] @@ -139,6 +141,8 @@ pub enum ContactsServiceResponse { Contact(Contact), Contacts(Vec), OnlineStatus(ContactOnlineStatus), + Messages(Vec), + MessageSent, } #[derive(Clone)] @@ -224,4 +228,26 @@ impl ContactsServiceHandle { _ => Err(ContactsServiceError::UnexpectedApiResponse), } } + + pub async fn get_all_messages(&mut self, pk: TariAddress) -> Result, ContactsServiceError> { + match self + .request_response_service + .call(ContactsServiceRequest::GetAllMessages(pk)) + .await?? + { + ContactsServiceResponse::Messages(messages) => Ok(messages), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } + + pub async fn send_message(&mut self, message: Message) -> Result<(), ContactsServiceError> { + match self + .request_response_service + .call(ContactsServiceRequest::SendMessage(message.address.clone(), message)) + .await?? + { + ContactsServiceResponse::MessageSent => Ok(()), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/contacts/src/contacts_service/mod.rs b/base_layer/contacts/src/contacts_service/mod.rs index 7d7a13e18d..8ada6521c8 100644 --- a/base_layer/contacts/src/contacts_service/mod.rs +++ b/base_layer/contacts/src/contacts_service/mod.rs @@ -22,15 +22,18 @@ pub mod error; pub mod handle; +pub mod proto; pub mod service; pub mod storage; +pub mod types; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use futures::future; use log::*; use tari_comms::connectivity::ConnectivityRequester; -use tari_p2p::services::liveness::LivenessHandle; +use tari_comms_dht::Dht; +use tari_p2p::{comms_connector::SubscriptionFactory, services::liveness::LivenessHandle}; use tari_service_framework::{ async_trait, reply_channel, @@ -54,16 +57,23 @@ where T: ContactsBackend backend: Option, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, + subscription_factory: Arc, } impl ContactsServiceInitializer where T: ContactsBackend { - pub fn new(backend: T, contacts_auto_ping_interval: Duration, online_ping_window: usize) -> Self { + pub fn new( + backend: T, + subscription_factory: Arc, + contacts_auto_ping_interval: Duration, + online_ping_window: usize, + ) -> Self { Self { backend: Some(backend), contacts_auto_ping_interval, contacts_online_ping_window: online_ping_window, + subscription_factory, } } } @@ -90,9 +100,11 @@ where T: ContactsBackend + 'static let contacts_auto_ping_interval = self.contacts_auto_ping_interval; let contacts_online_ping_window = self.contacts_online_ping_window; + let subscription_factory = self.subscription_factory.clone(); context.spawn_when_ready(move |handles| async move { let liveness = handles.expect_handle::(); let connectivity = handles.expect_handle::(); + let dht = handles.expect_handle::(); let service = ContactsService::new( ContactsDatabase::new(backend), @@ -100,6 +112,8 @@ where T: ContactsBackend + 'static handles.get_shutdown_signal(), liveness, connectivity, + dht, + subscription_factory, publisher, contacts_auto_ping_interval, contacts_online_ping_window, diff --git a/base_layer/contacts/src/contacts_service/proto/mod.rs b/base_layer/contacts/src/contacts_service/proto/mod.rs new file mode 100644 index 0000000000..978869c643 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/proto/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +include!(concat!(env!("OUT_DIR"), "/tari.contacts.chat.rs")); diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index bc34570aa1..627c56fa00 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -30,20 +30,34 @@ use std::{ use chrono::{NaiveDateTime, Utc}; use futures::{pin_mut, StreamExt}; use log::*; +use tari_common_types::tari_address::TariAddress; use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester}; -use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent}; +use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundEncryption, Dht}; +use tari_p2p::{ + comms_connector::SubscriptionFactory, + domain_message::DomainMessage, + services::{ + liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent}, + utils::{map_decode, ok_or_skip_result}, + }, + tari_message::TariMessageType, +}; use tari_service_framework::reply_channel; use tari_shutdown::ShutdownSignal; +use tari_utilities::ByteArray; use tokio::sync::broadcast; use crate::contacts_service::{ error::ContactsServiceError, handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceRequest, ContactsServiceResponse}, - storage::database::{Contact, ContactsBackend, ContactsDatabase}, + proto, + storage::database::{ContactsBackend, ContactsDatabase}, + types::{Contact, Message}, }; const LOG_TARGET: &str = "contacts::contacts_service"; const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3; +pub const SUBSCRIPTION_LABEL: &str = "Chat"; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ContactMessageType { @@ -91,6 +105,8 @@ where T: ContactsBackend + 'static liveness: LivenessHandle, liveness_data: Vec, connectivity: ConnectivityRequester, + dht: Dht, + subscription_factory: Arc, event_publisher: broadcast::Sender>, number_of_rounds_no_pings: u16, contacts_auto_ping_interval: Duration, @@ -109,6 +125,8 @@ where T: ContactsBackend + 'static shutdown_signal: ShutdownSignal, liveness: LivenessHandle, connectivity: ConnectivityRequester, + dht: Dht, + subscription_factory: Arc, event_publisher: broadcast::Sender>, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, @@ -120,6 +138,8 @@ where T: ContactsBackend + 'static liveness, liveness_data: Vec::new(), connectivity, + dht, + subscription_factory, event_publisher, number_of_rounds_no_pings: 0, contacts_auto_ping_interval, @@ -141,6 +161,13 @@ where T: ContactsBackend + 'static let connectivity_events = self.connectivity.get_event_subscription(); pin_mut!(connectivity_events); + let chat_messages = self + .subscription_factory + .get_subscription(TariMessageType::Chat, SUBSCRIPTION_LABEL) + .map(map_decode::) + .filter_map(ok_or_skip_result); + pin_mut!(chat_messages); + let shutdown = self .shutdown_signal .take() @@ -156,6 +183,13 @@ where T: ContactsBackend + 'static debug!(target: LOG_TARGET, "Contacts Service started"); loop { tokio::select! { + // Incoming chat messages + Some(msg) = chat_messages.next() => { + if let Err(err) = self.handle_incoming_message(msg).await { + warn!(target: LOG_TARGET, "Failed to handle incoming chat message: {}", err); + } + }, + Some(request_context) = request_stream.next() => { let (request, reply_tx) = request_context.split(); let response = self.handle_request(request).await.map_err(|e| { @@ -177,7 +211,7 @@ where T: ContactsBackend + 'static Ok(event) = connectivity_events.recv() => { self.handle_connectivity_event(event); - } + }, _ = shutdown.wait() => { info!(target: LOG_TARGET, "Contacts service shutting down because it received the shutdown signal"); @@ -203,10 +237,10 @@ where T: ContactsBackend + 'static }, ContactsServiceRequest::UpsertContact(c) => { self.db.upsert_contact(c.clone())?; - self.liveness.check_add_monitored_peer(c.node_id).await?; + self.liveness.check_add_monitored_peer(c.node_id.clone()).await?; info!( target: LOG_TARGET, - "Contact Saved: \nAlias: {}\nAddress: {} ", c.alias, c.address + "Contact Saved: \nAlias: {}\nAddress: {}\nNodeId: {}", c.alias, c.address, c.node_id ); Ok(ContactsServiceResponse::ContactSaved) }, @@ -232,6 +266,47 @@ where T: ContactsBackend + 'static let result = self.get_online_status(&contact).await; Ok(result.map(ContactsServiceResponse::OnlineStatus)?) }, + ContactsServiceRequest::GetAllMessages(pk) => { + let result = self.db.get_messages(pk); + Ok(result.map(ContactsServiceResponse::Messages)?) + }, + ContactsServiceRequest::SendMessage(address, mut message) => { + let contact = match self.db.get_contact(address.clone()) { + Ok(contact) => contact, + Err(_) => Contact::from(&address), + }; + + let ob_message = OutboundDomainMessage::from(message.clone()); + let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); + + match self.get_online_status(&contact).await { + Ok(ContactOnlineStatus::Online) => { + info!(target: LOG_TARGET, "Chat message being sent directed"); + let mut comms_outbound = self.dht.outbound_requester(); + + comms_outbound + .send_direct_encrypted( + address.public_key().clone(), + ob_message, + encryption, + "contact service messaging".to_string(), + ) + .await?; + }, + Err(e) => return Err(e), + _ => { + let mut comms_outbound = self.dht.outbound_requester(); + comms_outbound + .closest_broadcast(address.public_key().clone(), encryption, vec![], ob_message) + .await?; + }, + } + + message.stored_at = Utc::now().naive_utc().timestamp() as u64; + self.db.save_message(message)?; + + Ok(ContactsServiceResponse::MessageSent) + }, } } @@ -304,6 +379,26 @@ where T: ContactsBackend + 'static Ok(()) } + async fn handle_incoming_message( + &mut self, + msg: DomainMessage, + ) -> Result<(), ContactsServiceError> { + if let Some(source_public_key) = msg.authenticated_origin { + let DomainMessage::<_> { inner: msg, .. } = msg; + + let message = Message::from(msg.clone()); + let message = Message { + address: TariAddress::from_public_key(&source_public_key, message.address.network()), + stored_at: Utc::now().naive_utc().timestamp() as u64, + ..msg.into() + }; + + self.db.save_message(message).expect("Couldn't save the message"); + } + + Ok(()) + } + async fn get_online_status(&self, contact: &Contact) -> Result { let mut online_status = ContactOnlineStatus::NeverSeen; if let Some(peer_data) = self.connectivity.get_peer_info(contact.node_id.clone()).await? { diff --git a/base_layer/contacts/src/contacts_service/storage/database.rs b/base_layer/contacts/src/contacts_service/storage/database.rs index e70a60898c..849f5cb5b8 100644 --- a/base_layer/contacts/src/contacts_service/storage/database.rs +++ b/base_layer/contacts/src/contacts_service/storage/database.rs @@ -30,39 +30,13 @@ use log::*; use tari_common_types::tari_address::TariAddress; use tari_comms::peer_manager::NodeId; -use crate::contacts_service::error::ContactsServiceStorageError; +use crate::contacts_service::{ + error::ContactsServiceStorageError, + types::{Contact, Message}, +}; const LOG_TARGET: &str = "contacts::contacts_service::database"; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Contact { - pub alias: String, - pub address: TariAddress, - pub node_id: NodeId, - pub last_seen: Option, - pub latency: Option, - pub favourite: bool, -} - -impl Contact { - pub fn new( - alias: String, - address: TariAddress, - last_seen: Option, - latency: Option, - favourite: bool, - ) -> Self { - Self { - alias, - node_id: NodeId::from_key(address.public_key()), - address, - last_seen, - latency, - favourite, - } - } -} - /// This trait defines the functionality that a database backend need to provide for the Contacts Service pub trait ContactsBackend: Send + Sync + Clone { /// Retrieve the record associated with the provided DbKey @@ -76,12 +50,15 @@ pub enum DbKey { Contact(TariAddress), ContactId(NodeId), Contacts, + Messages(TariAddress), } pub enum DbValue { Contact(Box), Contacts(Vec), TariAddress(Box), + Message(Box), + Messages(Vec), } #[allow(clippy::large_enum_variant)] @@ -91,6 +68,7 @@ pub enum DbKeyValuePair { } pub enum WriteOperation { + Insert(Box), Upsert(Box), UpdateLastSeen(Box), Remove(DbKey), @@ -177,11 +155,32 @@ where T: ContactsBackend + 'static .ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::Contact(address.clone())))?; match result { DbValue::Contact(c) => Ok(*c), - DbValue::Contacts(_) | DbValue::TariAddress(_) => Err(ContactsServiceStorageError::UnexpectedResult( + _ => Err(ContactsServiceStorageError::UnexpectedResult( "Incorrect response from backend.".to_string(), )), } } + + pub fn get_messages(&self, address: TariAddress) -> Result, ContactsServiceStorageError> { + let key = DbKey::Messages(address); + let db_clone = self.db.clone(); + match db_clone.fetch(&key) { + Ok(None) => log_error( + key, + ContactsServiceStorageError::UnexpectedResult("Could not retrieve messages".to_string()), + ), + Ok(Some(DbValue::Messages(messages))) => Ok(messages), + Ok(Some(other)) => unexpected_result(key, other), + Err(e) => log_error(key, e), + } + } + + pub fn save_message(&self, message: Message) -> Result<(), ContactsServiceStorageError> { + self.db + .write(WriteOperation::Insert(Box::new(DbValue::Message(Box::new(message)))))?; + + Ok(()) + } } fn unexpected_result(req: DbKey, res: DbValue) -> Result { @@ -196,6 +195,7 @@ impl Display for DbKey { DbKey::Contact(c) => f.write_str(&format!("Contact: {:?}", c)), DbKey::ContactId(id) => f.write_str(&format!("Contact: {:?}", id)), DbKey::Contacts => f.write_str("Contacts"), + DbKey::Messages(c) => f.write_str(&format!("Messages for id: {:?}", c)), } } } @@ -206,6 +206,8 @@ impl Display for DbValue { DbValue::Contact(_) => f.write_str("Contact"), DbValue::Contacts(_) => f.write_str("Contacts"), DbValue::TariAddress(_) => f.write_str("Address"), + DbValue::Messages(_) => f.write_str("Messages"), + DbValue::Message(_) => f.write_str("Message"), } } } diff --git a/base_layer/contacts/src/contacts_service/storage/mod.rs b/base_layer/contacts/src/contacts_service/storage/mod.rs index dd5a7bbc6e..4c3855d1b1 100644 --- a/base_layer/contacts/src/contacts_service/storage/mod.rs +++ b/base_layer/contacts/src/contacts_service/storage/mod.rs @@ -22,3 +22,4 @@ pub mod database; pub mod sqlite_db; +pub mod types; diff --git a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs index 3977c69942..d7acb3a530 100644 --- a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs +++ b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs @@ -22,24 +22,22 @@ use std::{convert::TryFrom, io::Write, sync::Arc}; -use chrono::NaiveDateTime; -use diesel::{prelude::*, result::Error as DieselError, SqliteConnection}; +use diesel::result::Error as DieselError; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; -use tari_common_sqlite::{ - error::SqliteStorageError, - sqlite_connection_pool::PooledDbConnection, - util::diesel_ext::ExpectedRowsExtension, -}; +use tari_common_sqlite::{error::SqliteStorageError, sqlite_connection_pool::PooledDbConnection}; use tari_common_types::tari_address::TariAddress; -use tari_comms::peer_manager::NodeId; use tari_utilities::ByteArray; -use crate::{ - contacts_service::{ - error::ContactsServiceStorageError, - storage::database::{Contact, ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation}, +use crate::contacts_service::{ + error::ContactsServiceStorageError, + storage::{ + database::{ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation}, + types::{ + contacts::{ContactSql, UpdateContact}, + messages::{MessagesSql, MessagesSqlInsert}, + }, }, - schema::contacts, + types::{Contact, Message}, }; const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); @@ -106,6 +104,16 @@ where TContactServiceDbConnection: PooledDbConnection, _>>()?, )), + DbKey::Messages(address) => match MessagesSql::find_by_address(&address.to_bytes(), &mut conn) { + Ok(messages) => Some(DbValue::Messages( + messages + .iter() + .map(|m| Message::try_from(m.clone()).expect("Couldn't cast MessageSql to Message")) + .collect::>(), + )), + Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => None, + Err(e) => return Err(e), + }, }; Ok(result) @@ -162,6 +170,12 @@ where TContactServiceDbConnection: PooledDbConnection return Err(e), }, DbKey::Contacts => return Err(ContactsServiceStorageError::OperationNotSupported), + DbKey::Messages(_pk) => return Err(ContactsServiceStorageError::OperationNotSupported), + }, + WriteOperation::Insert(i) => { + if let DbValue::Message(m) = *i { + MessagesSqlInsert::from(*m).commit(&mut conn)?; + } }, } @@ -169,155 +183,6 @@ where TContactServiceDbConnection: PooledDbConnection, - node_id: Vec, - alias: String, - last_seen: Option, - latency: Option, - favourite: i32, -} - -impl ContactSql { - /// Write this struct to the database - pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), ContactsServiceStorageError> { - diesel::insert_into(contacts::table) - .values(self.clone()) - .execute(conn)?; - Ok(()) - } - - /// Return all contacts - pub fn index(conn: &mut SqliteConnection) -> Result, ContactsServiceStorageError> { - Ok(contacts::table.load::(conn)?) - } - - /// Find a particular Contact by their address, if it exists - pub fn find_by_address( - address: &[u8], - conn: &mut SqliteConnection, - ) -> Result { - Ok(contacts::table - .filter(contacts::address.eq(address)) - .first::(conn)?) - } - - /// Find a particular Contact by their node ID, if it exists - pub fn find_by_node_id( - node_id: &[u8], - conn: &mut SqliteConnection, - ) -> Result { - Ok(contacts::table - .filter(contacts::node_id.eq(node_id)) - .first::(conn)?) - } - - /// Find a particular Contact by their address, and update it if it exists, returning the affected record - pub fn find_by_address_and_update( - conn: &mut SqliteConnection, - address: &[u8], - updated_contact: UpdateContact, - ) -> Result { - // Note: `get_result` not implemented for SQLite - diesel::update(contacts::table.filter(contacts::address.eq(address))) - .set(updated_contact) - .execute(conn) - .num_rows_affected_or_not_found(1)?; - ContactSql::find_by_address(address, conn) - } - - /// Find a particular Contact by their address, and delete it if it exists, returning the affected record - pub fn find_by_address_and_delete( - conn: &mut SqliteConnection, - address: &[u8], - ) -> Result { - // Note: `get_result` not implemented for SQLite - let contact = ContactSql::find_by_address(address, conn)?; - if diesel::delete(contacts::table.filter(contacts::address.eq(address))).execute(conn)? == 0 { - return Err(ContactsServiceStorageError::ValuesNotFound); - } - Ok(contact) - } - - /// Find a particular Contact by their node ID, and update it if it exists, returning the affected record - pub fn find_by_node_id_and_update( - conn: &mut SqliteConnection, - node_id: &[u8], - updated_contact: UpdateContact, - ) -> Result { - // Note: `get_result` not implemented for SQLite - diesel::update(contacts::table.filter(contacts::node_id.eq(node_id))) - .set(updated_contact) - .execute(conn) - .num_rows_affected_or_not_found(1)?; - ContactSql::find_by_node_id(node_id, conn) - } - - /// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record - pub fn find_by_node_id_and_delete( - conn: &mut SqliteConnection, - node_id: &[u8], - ) -> Result { - // Note: `get_result` not implemented for SQLite - let contact = ContactSql::find_by_node_id(node_id, conn)?; - if diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))).execute(conn)? == 0 { - return Err(ContactsServiceStorageError::ValuesNotFound); - } - Ok(contact) - } -} - -/// Conversion from an Contact to the Sql datatype form -impl TryFrom for Contact { - type Error = ContactsServiceStorageError; - - #[allow(clippy::cast_sign_loss)] - fn try_from(o: ContactSql) -> Result { - let address = TariAddress::from_bytes(&o.address).map_err(|_| ContactsServiceStorageError::ConversionError)?; - Ok(Self { - // Public key must always be the master data source for node ID here - node_id: NodeId::from_key(address.public_key()), - address, - alias: o.alias, - last_seen: o.last_seen, - latency: o.latency.map(|val| val as u32), - favourite: match o.favourite { - 0 => false, - 1 => true, - _ => return Err(ContactsServiceStorageError::ConversionError), - }, - }) - } -} - -/// Conversion from a Contact to the Sql datatype form -#[allow(clippy::cast_possible_wrap)] -impl From for ContactSql { - fn from(o: Contact) -> Self { - Self { - // Public key must always be the master data source for node ID here - node_id: NodeId::from_key(o.address.public_key()).to_vec(), - address: o.address.to_bytes().to_vec(), - alias: o.alias, - last_seen: o.last_seen, - latency: o.latency.map(|val| val as i32), - favourite: i32::from(o.favourite), - } - } -} - -#[derive(AsChangeset)] -#[diesel(table_name = contacts)] -pub struct UpdateContact { - alias: Option, - last_seen: Option>, - latency: Option>, - favourite: Option, -} - #[cfg(test)] mod test { use std::convert::{TryFrom, TryInto}; @@ -333,9 +198,9 @@ mod test { use tari_test_utils::{paths::with_temp_dir, random::string}; use super::*; - use crate::contacts_service::storage::{ - database::Contact, - sqlite_db::{ContactSql, UpdateContact}, + use crate::contacts_service::{ + storage::types::contacts::{ContactSql, UpdateContact}, + types::Contact, }; #[test] diff --git a/base_layer/contacts/src/contacts_service/storage/types/contacts.rs b/base_layer/contacts/src/contacts_service/storage/types/contacts.rs new file mode 100644 index 0000000000..0878b2c230 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/storage/types/contacts.rs @@ -0,0 +1,184 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use chrono::NaiveDateTime; +use diesel::{prelude::*, SqliteConnection}; +use tari_common_sqlite::util::diesel_ext::ExpectedRowsExtension; +use tari_common_types::tari_address::TariAddress; +use tari_comms::peer_manager::NodeId; +use tari_utilities::ByteArray; + +use crate::{ + contacts_service::{error::ContactsServiceStorageError, types::Contact}, + schema::contacts, +}; + +/// A Sql version of the Contact struct +#[derive(Clone, Debug, Queryable, Insertable, PartialEq, Eq)] +#[diesel(table_name = contacts)] +pub struct ContactSql { + pub address: Vec, + node_id: Vec, + pub alias: String, + last_seen: Option, + latency: Option, + pub favourite: i32, +} + +impl ContactSql { + /// Write this struct to the database + pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), ContactsServiceStorageError> { + diesel::insert_into(contacts::table) + .values(self.clone()) + .execute(conn)?; + Ok(()) + } + + /// Return all contacts + pub fn index(conn: &mut SqliteConnection) -> Result, ContactsServiceStorageError> { + Ok(contacts::table.load::(conn)?) + } + + /// Find a particular Contact by their address, if it exists + pub fn find_by_address( + address: &[u8], + conn: &mut SqliteConnection, + ) -> Result { + Ok(contacts::table + .filter(contacts::address.eq(address)) + .first::(conn)?) + } + + /// Find a particular Contact by their node ID, if it exists + pub fn find_by_node_id( + node_id: &[u8], + conn: &mut SqliteConnection, + ) -> Result { + Ok(contacts::table + .filter(contacts::node_id.eq(node_id)) + .first::(conn)?) + } + + /// Find a particular Contact by their address, and update it if it exists, returning the affected record + pub fn find_by_address_and_update( + conn: &mut SqliteConnection, + address: &[u8], + updated_contact: UpdateContact, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(contacts::table.filter(contacts::address.eq(address))) + .set(updated_contact) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + ContactSql::find_by_address(address, conn) + } + + /// Find a particular Contact by their address, and delete it if it exists, returning the affected record + pub fn find_by_address_and_delete( + conn: &mut SqliteConnection, + address: &[u8], + ) -> Result { + // Note: `get_result` not implemented for SQLite + let contact = ContactSql::find_by_address(address, conn)?; + if diesel::delete(contacts::table.filter(contacts::address.eq(address))).execute(conn)? == 0 { + return Err(ContactsServiceStorageError::ValuesNotFound); + } + Ok(contact) + } + + /// Find a particular Contact by their node ID, and update it if it exists, returning the affected record + pub fn find_by_node_id_and_update( + conn: &mut SqliteConnection, + node_id: &[u8], + updated_contact: UpdateContact, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(contacts::table.filter(contacts::node_id.eq(node_id))) + .set(updated_contact) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + ContactSql::find_by_node_id(node_id, conn) + } + + /// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record + pub fn find_by_node_id_and_delete( + conn: &mut SqliteConnection, + node_id: &[u8], + ) -> Result { + // Note: `get_result` not implemented for SQLite + let contact = ContactSql::find_by_node_id(node_id, conn)?; + diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + Ok(contact) + } +} + +/// Conversion from an Contact to the Sql datatype form +impl TryFrom for Contact { + type Error = ContactsServiceStorageError; + + #[allow(clippy::cast_sign_loss)] + fn try_from(o: ContactSql) -> Result { + let address = TariAddress::from_bytes(&o.address).map_err(|_| ContactsServiceStorageError::ConversionError)?; + Ok(Self { + // Public key must always be the master data source for node ID here + node_id: NodeId::from_key(address.public_key()), + address, + alias: o.alias, + last_seen: o.last_seen, + latency: o.latency.map(|val| val as u32), + favourite: match o.favourite { + 0 => false, + 1 => true, + _ => return Err(ContactsServiceStorageError::ConversionError), + }, + }) + } +} + +/// Conversion from a Contact to the Sql datatype form +#[allow(clippy::cast_possible_wrap)] +impl From for ContactSql { + fn from(o: Contact) -> Self { + Self { + // Public key must always be the master data source for node ID here + node_id: NodeId::from_key(o.address.public_key()).to_vec(), + address: o.address.to_bytes().to_vec(), + alias: o.alias, + last_seen: o.last_seen, + latency: o.latency.map(|val| val as i32), + favourite: i32::from(o.favourite), + } + } +} + +#[derive(AsChangeset)] +#[diesel(table_name = contacts)] +pub struct UpdateContact { + pub alias: Option, + pub last_seen: Option>, + pub latency: Option>, + pub favourite: Option, +} diff --git a/base_layer/contacts/src/contacts_service/storage/types/messages.rs b/base_layer/contacts/src/contacts_service/storage/types/messages.rs new file mode 100644 index 0000000000..ad4426d86b --- /dev/null +++ b/base_layer/contacts/src/contacts_service/storage/types/messages.rs @@ -0,0 +1,112 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use chrono::NaiveDateTime; +use diesel::{prelude::*, SqliteConnection}; +use tari_common_types::tari_address::TariAddress; + +use crate::{ + contacts_service::{ + error::ContactsServiceStorageError, + types::{Direction, Message}, + }, + schema::messages, +}; + +/// A Sql version of the Contact struct +#[derive(Clone, Debug, Insertable, PartialEq, Eq)] +#[diesel(table_name = messages)] +#[diesel(primary_key(message_id))] +pub struct MessagesSqlInsert { + pub address: Vec, + pub body: Vec, + pub direction: i32, + pub stored_at: NaiveDateTime, + pub message_id: Vec, +} + +#[derive(Clone, Debug, Queryable, PartialEq, Eq, QueryableByName)] +#[diesel(table_name = messages)] +#[diesel(primary_key(message_id))] +pub struct MessagesSql { + pub address: Vec, + pub message_id: Vec, + pub body: Vec, + pub stored_at: NaiveDateTime, + pub direction: i32, +} + +impl MessagesSqlInsert { + /// Write this struct to the database + pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), ContactsServiceStorageError> { + diesel::insert_into(messages::table) + .values(self.clone()) + .execute(conn)?; + Ok(()) + } +} + +impl MessagesSql { + /// Find a particular message by their address, if it exists + pub fn find_by_address( + address: &[u8], + conn: &mut SqliteConnection, + ) -> Result, ContactsServiceStorageError> { + Ok(messages::table + .filter(messages::address.eq(address)) + .load::(conn)?) + } +} + +/// Conversion from an Message to the Sql datatype form +impl TryFrom for Message { + type Error = ContactsServiceStorageError; + + #[allow(clippy::cast_sign_loss)] + fn try_from(o: MessagesSql) -> Result { + let address = TariAddress::from_bytes(&o.address).map_err(|_| ContactsServiceStorageError::ConversionError)?; + Ok(Self { + address, + direction: Direction::from_byte(o.direction as u8) + .unwrap_or_else(|| panic!("Direction from byte {}", o.direction)), + stored_at: o.stored_at.timestamp() as u64, + body: o.body, + message_id: o.message_id, + }) + } +} + +/// Conversion from a Contact to the Sql datatype form +#[allow(clippy::cast_possible_wrap)] +impl From for MessagesSqlInsert { + fn from(o: Message) -> Self { + Self { + address: o.address.to_bytes().to_vec(), + direction: i32::from(o.direction.as_byte()), + stored_at: NaiveDateTime::from_timestamp_opt(o.stored_at as i64, 0).unwrap(), + body: o.body, + message_id: o.message_id, + } + } +} diff --git a/base_layer/contacts/src/contacts_service/storage/types/mod.rs b/base_layer/contacts/src/contacts_service/storage/types/mod.rs new file mode 100644 index 0000000000..eeb833bfa9 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/storage/types/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +pub mod contacts; +pub mod messages; diff --git a/base_layer/contacts/src/contacts_service/types/contact.rs b/base_layer/contacts/src/contacts_service/types/contact.rs new file mode 100644 index 0000000000..19153a786c --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/contact.rs @@ -0,0 +1,67 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use chrono::NaiveDateTime; +use tari_common_types::tari_address::TariAddress; +use tari_comms::peer_manager::NodeId; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Contact { + pub alias: String, + pub address: TariAddress, + pub node_id: NodeId, + pub last_seen: Option, + pub latency: Option, + pub favourite: bool, +} + +impl Contact { + pub fn new( + alias: String, + address: TariAddress, + last_seen: Option, + latency: Option, + favourite: bool, + ) -> Self { + Self { + alias, + node_id: NodeId::from_key(address.public_key()), + address, + last_seen, + latency, + favourite, + } + } +} + +impl From<&TariAddress> for Contact { + fn from(address: &TariAddress) -> Self { + Self { + alias: address.to_emoji_string(), + address: address.clone(), + node_id: NodeId::from_key(address.public_key()), + last_seen: None, + latency: None, + favourite: false, + } + } +} diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs new file mode 100644 index 0000000000..957009ed9b --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -0,0 +1,93 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use num_derive::FromPrimitive; +use num_traits::FromPrimitive; +use tari_common_types::tari_address::TariAddress; +use tari_comms_dht::domain_message::OutboundDomainMessage; +use tari_p2p::tari_message::TariMessageType; +use tari_utilities::ByteArray; + +use crate::contacts_service::proto; + +#[derive(Clone, Debug, Default)] +pub struct Message { + pub body: Vec, + pub address: TariAddress, + pub direction: Direction, + pub stored_at: u64, + pub message_id: Vec, +} + +#[repr(u8)] +#[derive(FromPrimitive, Debug, Copy, Clone)] +pub enum Direction { + Inbound = 0, + Outbound = 1, +} + +impl Direction { + pub fn as_byte(self) -> u8 { + self as u8 + } + + pub fn from_byte(value: u8) -> Option { + FromPrimitive::from_u8(value) + } +} + +impl Default for Direction { + fn default() -> Self { + Self::Outbound + } +} + +impl From for Message { + fn from(message: proto::Message) -> Self { + Self { + body: message.body, + address: TariAddress::from_bytes(&message.address).expect("Couldn't parse address"), + // A Message from a proto::Message will always be an inbound message + direction: Direction::Inbound, + stored_at: message.stored_at, + message_id: message.message_id, + } + } +} + +impl From for proto::Message { + fn from(message: Message) -> Self { + Self { + body: message.body, + address: message.address.to_bytes().to_vec(), + direction: i32::from(message.direction.as_byte()), + stored_at: message.stored_at, + message_id: message.message_id, + } + } +} + +impl From for OutboundDomainMessage { + fn from(message: Message) -> Self { + Self::new(&TariMessageType::Chat, message.into()) + } +} diff --git a/base_layer/contacts/src/contacts_service/types/message_builder.rs b/base_layer/contacts/src/contacts_service/types/message_builder.rs new file mode 100644 index 0000000000..96e95ee3e5 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/message_builder.rs @@ -0,0 +1,68 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use tari_common_types::tari_address::TariAddress; +use tari_utilities::ByteArray; +use uuid::Uuid; + +use crate::contacts_service::types::Message; + +#[derive(Clone, Debug, Default)] +pub struct MessageBuilder { + inner: Message, +} + +impl MessageBuilder { + pub fn new() -> Self { + let message_id = Uuid::new_v4().into_bytes().to_vec(); + + Self { + inner: Message { + message_id, + ..Message::default() + }, + } + } + + pub fn address(&self, address: TariAddress) -> Self { + Self { + inner: Message { + address, + ..self.inner.clone() + }, + } + } + + pub fn message(&self, body: String) -> Self { + let body = body.into_bytes(); + Self { + inner: Message { + body, + ..self.inner.clone() + }, + } + } + + pub fn build(&self) -> Message { + self.inner.clone() + } +} diff --git a/base_layer/contacts/src/contacts_service/types/mod.rs b/base_layer/contacts/src/contacts_service/types/mod.rs new file mode 100644 index 0000000000..89b83734b5 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/mod.rs @@ -0,0 +1,30 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod contact; +pub use contact::Contact; + +mod message; +pub use message::{Direction, Message}; + +mod message_builder; +pub use message_builder::MessageBuilder; diff --git a/base_layer/contacts/src/schema.rs b/base_layer/contacts/src/schema.rs index 86b35c8fb9..25f9d3e061 100644 --- a/base_layer/contacts/src/schema.rs +++ b/base_layer/contacts/src/schema.rs @@ -10,3 +10,13 @@ diesel::table! { favourite -> Integer, } } + +diesel::table! { + messages (message_id) { + address -> Binary, + message_id -> Binary, + body -> Binary, + stored_at -> Timestamp, + direction -> Integer, + } +} diff --git a/base_layer/contacts/tests/contacts_service.rs b/base_layer/contacts/tests/contacts_service.rs index 29922b223f..16f5f3d76c 100644 --- a/base_layer/contacts/tests/contacts_service.rs +++ b/base_layer/contacts/tests/contacts_service.rs @@ -32,9 +32,10 @@ use tari_contacts::contacts_service::{ error::{ContactsServiceError, ContactsServiceStorageError}, handle::ContactsServiceHandle, storage::{ - database::{Contact, ContactsBackend, DbKey}, + database::{ContactsBackend, DbKey}, sqlite_db::ContactsServiceSqliteDatabase, }, + types::Contact, ContactsServiceInitializer, }; use tari_crypto::keys::PublicKey as PublicKeyTrait; @@ -92,7 +93,7 @@ pub fn setup_contacts_service( allow_test_addresses: true, listener_liveness_allowlist_cidrs: StringList::new(), listener_liveness_max_sessions: 0, - user_agent: "tari/test-wallet".to_string(), + user_agent: "tari/test-contacts-service".to_string(), rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, listener_liveness_check_interval: None, @@ -114,9 +115,14 @@ pub fn setup_contacts_service( max_allowed_ping_failures: 0, // Peer with failed ping-pong will never be removed ..Default::default() }, + peer_message_subscription_factory.clone(), + )) + .add_initializer(ContactsServiceInitializer::new( + backend, peer_message_subscription_factory, + Duration::from_secs(5), + 2, )) - .add_initializer(ContactsServiceInitializer::new(backend, Duration::from_secs(5), 2)) .build(); let handles = runtime.block_on(fut).expect("Service initialization failed"); diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index 5622e6a5f4..0ff14288ea 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -391,7 +391,7 @@ async fn handle_incoming_request( ); let send_message_response = outbound_message_service - .send_direct( + .send_direct_unencrypted( origin_public_key, OutboundDomainMessage::new(&TariMessageType::BaseNodeResponse, message), "Outbound response message from base node".to_string(), diff --git a/base_layer/core/tests/mempool.rs b/base_layer/core/tests/mempool.rs index a0cc0be8e9..b1dc9aa9c5 100644 --- a/base_layer/core/tests/mempool.rs +++ b/base_layer/core/tests/mempool.rs @@ -854,7 +854,7 @@ async fn receive_and_propagate_transaction() { alice_node .outbound_message_service - .send_direct( + .send_direct_unencrypted( bob_node.node_identity.public_key().clone(), OutboundDomainMessage::new( &TariMessageType::NewTransaction, @@ -866,7 +866,7 @@ async fn receive_and_propagate_transaction() { .unwrap(); alice_node .outbound_message_service - .send_direct( + .send_direct_unencrypted( carol_node.node_identity.public_key().clone(), OutboundDomainMessage::new( &TariMessageType::NewTransaction, diff --git a/base_layer/p2p/src/proto/message_type.proto b/base_layer/p2p/src/proto/message_type.proto index 7363b2e23d..3073a95550 100644 --- a/base_layer/p2p/src/proto/message_type.proto +++ b/base_layer/p2p/src/proto/message_type.proto @@ -13,6 +13,7 @@ enum TariMessageType { // -- NetMessages -- TariMessageTypePingPong = 1; + TariMessageTypeChat = 2; // -- Blockchain messages -- diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 0844872272..2d63c27eb6 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -226,7 +226,7 @@ where async fn send_pong(&mut self, nonce: u64, dest: CommsPublicKey) -> Result<(), LivenessError> { let msg = PingPongMessage::pong_with_metadata(nonce, self.state.metadata().clone()); self.outbound_messaging - .send_direct( + .send_direct_unencrypted( dest, OutboundDomainMessage::new(&TariMessageType::PingPong, msg), "Sending pong".to_string(), diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index 3c475f2df2..794dc8d86d 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -665,7 +665,7 @@ where match self .resources .outbound_message_service - .send_direct( + .send_direct_unencrypted( self.dest_address.public_key().clone(), OutboundDomainMessage::new(&TariMessageType::SenderPartialTransaction, proto_message.clone()), "transaction send".to_string(), diff --git a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs index 66ca04aab2..2b438de1e5 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs @@ -108,7 +108,7 @@ pub async fn send_finalized_transaction_message_direct( let mut store_and_forward_send_result = false; let mut direct_send_result = false; match outbound_message_service - .send_direct( + .send_direct_unencrypted( destination_public_key.clone(), OutboundDomainMessage::new( &TariMessageType::TransactionFinalized, diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs index 5cac558ee4..da51dd654c 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs @@ -40,7 +40,7 @@ pub async fn send_transaction_cancelled_message( // Send both direct and SAF we are not going to monitor the progress on these messages for potential resend as // they are just courtesy messages let _send_message_response = outbound_message_service - .send_direct( + .send_direct_unencrypted( destination_public_key.clone(), OutboundDomainMessage::new(&TariMessageType::TransactionCancelled, proto_message.clone()), "transaction cancelled".to_string(), diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs index cb822904a5..462ba452e1 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs @@ -96,7 +96,7 @@ pub async fn send_transaction_reply_direct( .try_into() .map_err(TransactionServiceError::ServiceError)?; match outbound_message_service - .send_direct( + .send_direct_unencrypted( inbound_transaction.source_address.public_key().clone(), OutboundDomainMessage::new(&TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()), "wallet transaction reply".to_string(), diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index ad56e0594c..e16c78db68 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -208,10 +208,11 @@ where max_allowed_ping_failures: 0, // Peer with failed ping-pong will never be removed ..Default::default() }, - peer_message_subscription_factory, + peer_message_subscription_factory.clone(), )) .add_initializer(ContactsServiceInitializer::new( contacts_backend, + peer_message_subscription_factory, config.contacts_auto_ping_interval, config.contacts_online_ping_window, )) diff --git a/base_layer/wallet/tests/wallet.rs b/base_layer/wallet/tests/wallet.rs index 4a5e9a33c3..6a64d3d0f7 100644 --- a/base_layer/wallet/tests/wallet.rs +++ b/base_layer/wallet/tests/wallet.rs @@ -43,7 +43,8 @@ use tari_comms_dht::{store_forward::SafConfig, DhtConfig}; use tari_contacts::contacts_service::{ handle::ContactsLivenessEvent, service::ContactMessageType, - storage::{database::Contact, sqlite_db::ContactsServiceSqliteDatabase}, + storage::sqlite_db::ContactsServiceSqliteDatabase, + types::Contact, }; use tari_core::{ consensus::ConsensusManager, diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index 3265520b29..44b625ab30 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -379,7 +379,7 @@ where TBackend: TransactionBackend + 'static ); self.trigger_contacts_refresh(data.deref().clone()); } - ContactsLivenessEvent::NetworkSilence => {} + ContactsLivenessEvent::NetworkSilence => {}, } } Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/base_layer/wallet_ffi/src/callback_handler_tests.rs b/base_layer/wallet_ffi/src/callback_handler_tests.rs index 0fc474ca4c..f5fc269f03 100644 --- a/base_layer/wallet_ffi/src/callback_handler_tests.rs +++ b/base_layer/wallet_ffi/src/callback_handler_tests.rs @@ -25,7 +25,7 @@ mod test { use tari_contacts::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent}, service::{ContactMessageType, ContactOnlineStatus}, - storage::database::Contact, + types::Contact, }; use tari_core::transactions::{ tari_amount::{uT, MicroTari}, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 0ebbbb04cd..fa7794b7fe 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -100,7 +100,7 @@ use tari_comms::{ types::CommsPublicKey, }; use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig}; -use tari_contacts::contacts_service::storage::database::Contact; +use tari_contacts::contacts_service::types::Contact; use tari_core::{ borsh::FromBytes, consensus::ConsensusManager, @@ -210,7 +210,7 @@ pub struct TariUnblindedOutputs(Vec); pub struct TariContacts(Vec); -pub type TariContact = tari_contacts::contacts_service::storage::database::Contact; +pub type TariContact = tari_contacts::contacts_service::types::Contact; pub type TariCompletedTransaction = tari_wallet::transaction_service::storage::models::CompletedTransaction; pub type TariTransactionSendStatus = tari_wallet::transaction_service::handle::TransactionSendStatus; pub type TariFeePerGramStats = tari_wallet::transaction_service::handle::FeePerGramStatsResponse; diff --git a/comms/dht/src/outbound/requester.rs b/comms/dht/src/outbound/requester.rs index 0ac3e2e619..33f6bd4568 100644 --- a/comms/dht/src/outbound/requester.rs +++ b/comms/dht/src/outbound/requester.rs @@ -51,7 +51,34 @@ impl OutboundMessageRequester { } /// Send directly to a peer. If the peer does not exist in the peer list, a discovery will be initiated. - pub async fn send_direct( + pub async fn send_direct_encrypted( + &mut self, + dest_public_key: CommsPublicKey, + message: OutboundDomainMessage, + encryption: OutboundEncryption, + source_info: String, + ) -> Result + where + T: prost::Message, + { + self.send_message( + SendMessageParams::new() + .with_debug_info(format!("Send direct to {} from {}", &dest_public_key, source_info)) + .direct_public_key(dest_public_key.clone()) + .with_discovery(true) + .with_encryption(encryption) + .with_destination(dest_public_key.into()) + .finish(), + message, + ) + .await? + .resolve() + .await + .map_err(Into::into) + } + + /// Send directly to a peer unencrypted. If the peer does not exist in the peer list, a discovery will be initiated. + pub async fn send_direct_unencrypted( &mut self, dest_public_key: CommsPublicKey, message: OutboundDomainMessage, diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 3c887fe03b..5a120acb27 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,11 +11,14 @@ tari_app_grpc = { path = "../applications/tari_app_grpc" } tari_app_utilities = { path = "../applications/tari_app_utilities" } tari_base_node = { path = "../applications/tari_base_node" } tari_base_node_grpc_client = { path = "../clients/rust/base_node_grpc_client" } +tari_chat_client = { path = "tests/utils/chat_client" } tari_common = { path = "../common" } +tari_common_sqlite = { path = "../common_sqlite" } tari_common_types = { path = "../base_layer/common_types" } tari_comms = { path = "../comms/core" } tari_comms_dht = { path = "../comms/dht" } tari_console_wallet = { path = "../applications/tari_console_wallet" } +tari_contacts = { path = "../base_layer/contacts" } tari_core = { path = "../base_layer/core" } tari_merge_mining_proxy = { path = "../applications/tari_merge_mining_proxy" } tari_miner = { path = "../applications/tari_miner" } @@ -24,8 +27,8 @@ tari_script = { path = "../infrastructure/tari_script" } tari_shutdown = { path = "../infrastructure/shutdown" } tari_utilities = "0.4.10" tari_wallet = { path = "../base_layer/wallet" } -tari_wallet_grpc_client = { path = "../clients/rust/wallet_grpc_client" } tari_wallet_ffi = { path = "../base_layer/wallet_ffi" } +tari_wallet_grpc_client = { path = "../clients/rust/wallet_grpc_client" } anyhow = "1.0.53" async-trait = "0.1.50" diff --git a/integration_tests/log4rs/base_node.yml b/integration_tests/log4rs/base_node.yml index bba162e0e9..094df031c2 100644 --- a/integration_tests/log4rs/base_node.yml +++ b/integration_tests/log4rs/base_node.yml @@ -109,6 +109,11 @@ loggers: level: debug appenders: - network + # Route log events sent to the "contacts" logger to the "network" appender + contacts: + level: debug + appenders: + - network # Route log events sent to the "p2p" logger to the "network" appender p2p: level: debug diff --git a/integration_tests/log4rs/cucumber.yml b/integration_tests/log4rs/cucumber.yml index f5a1c8263d..c79b190338 100644 --- a/integration_tests/log4rs/cucumber.yml +++ b/integration_tests/log4rs/cucumber.yml @@ -63,6 +63,21 @@ appenders: pattern: "{{log_dir}}/log/wallet.{}.log" encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}" + base_layer_contacts: + kind: rolling_file + path: "{{log_dir}}/log/contacts.log" + policy: + kind: compound + trigger: + kind: size + limit: 100mb + roller: + kind: fixed_window + base: 1 + count: 10 + pattern: "{{log_dir}}/log/contacts.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}" # An appender named "other" that writes to a file with a custom pattern encoder other: kind: rolling_file @@ -79,6 +94,7 @@ appenders: pattern: "{{log_dir}}/log/other.{}.log" encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}" + # We don't want prints during cucumber test, everything useful will in logs. # root: # level: warn @@ -114,6 +130,11 @@ loggers: level: debug appenders: - network + # Route log events sent to the "contacts" logger to the "network" appender + contacts: + level: debug + appenders: + - base_layer_contacts # Route log events sent to the "p2p" logger to the "network" appender p2p: level: debug diff --git a/integration_tests/log4rs/wallet.yml b/integration_tests/log4rs/wallet.yml index e42b0ba386..c7c805f7d8 100644 --- a/integration_tests/log4rs/wallet.yml +++ b/integration_tests/log4rs/wallet.yml @@ -69,6 +69,23 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] {l:5} {m}{n}" + # An appender named "contacts" that writes to a file with a custom pattern encoder + contacts: + kind: rolling_file + path: "log/wallet/contacts.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "log/wallet/contacts.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # Set the default logging level to "warn" and attach the "stdout" appender to the root root: level: warn @@ -93,6 +110,11 @@ loggers: appenders: - network additive: false + contacts: + level: debug + appenders: + - contacts + additive: false comms::noise: level: error appenders: diff --git a/integration_tests/tests/cucumber.rs b/integration_tests/tests/cucumber.rs index dfcad43f9b..f14bb80f83 100644 --- a/integration_tests/tests/cucumber.rs +++ b/integration_tests/tests/cucumber.rs @@ -38,18 +38,19 @@ use cucumber::{event::ScenarioFinished, gherkin::Scenario, given, then, when, Wo use futures::StreamExt; use indexmap::IndexMap; use log::*; -use rand::Rng; +use rand::{rngs::OsRng, Rng}; use serde_json::Value; use tari_app_grpc::tari_rpc::{self as grpc}; use tari_app_utilities::utilities::UniPublicKey; use tari_base_node::BaseNodeConfig; use tari_base_node_grpc_client::grpc::{GetBlocksRequest, ListHeadersRequest}; +use tari_chat_client::Client; use tari_common::{configuration::Network, initialize_logging}; use tari_common_types::{ tari_address::TariAddress, types::{BlindingFactor, ComAndPubSignature, Commitment, PrivateKey, PublicKey}, }; -use tari_comms::multiaddr::Multiaddr; +use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity}; use tari_console_wallet::{ BurnTariArgs, CliCommands, @@ -61,6 +62,7 @@ use tari_console_wallet::{ SetBaseNodeArgs, WhoisArgs, }; +use tari_contacts::contacts_service::service::ContactOnlineStatus; use tari_core::{ blocks::Block, consensus::ConsensusManager, @@ -100,8 +102,9 @@ use thiserror::Error; use tokio::runtime::Runtime; use crate::utils::{ - base_node_process::{spawn_base_node, spawn_base_node_with_config, BaseNodeProcess}, + base_node_process::{get_base_dir, spawn_base_node, spawn_base_node_with_config, BaseNodeProcess}, get_peer_addresses, + get_port, merge_mining_proxy::{register_merge_mining_proxy_process, MergeMiningProxyProcess}, miner::{ mine_block, @@ -147,6 +150,7 @@ pub struct TariWorld { miners: IndexMap, ffi_wallets: IndexMap, wallets: IndexMap, + chat_clients: IndexMap, merge_mining_proxies: IndexMap, transactions: IndexMap, wallet_addresses: IndexMap, // values are strings representing tari addresses @@ -277,10 +281,11 @@ impl TariWorld { pub async fn after(&mut self, _scenario: &Scenario) { self.base_nodes.clear(); - self.seed_nodes.clear(); - self.wallets.clear(); + self.chat_clients.clear(); self.ffi_wallets.clear(); self.miners.clear(); + self.seed_nodes.clear(); + self.wallets.clear(); } } @@ -4980,6 +4985,107 @@ async fn merge_mining_ask_for_block_header_by_hash(world: &mut TariWorld, mining world.last_merge_miner_response = merge_miner.get_block_header_by_hash(hash).await; } +#[when(expr = "I have a chat client {word} connected to seed node {word}")] +async fn chat_client_connected_to_base_node(world: &mut TariWorld, name: String, seed_node_name: String) { + let base_node = world.get_node(&seed_node_name).unwrap(); + + let port = get_port(18000..18499).unwrap(); + let temp_dir_path = get_base_dir() + .join("chat_clients") + .join(format!("port_{}", port)) + .join(name.clone()); + let address = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{}", port)).unwrap(); + let identity = NodeIdentity::random(&mut OsRng, address, PeerFeatures::COMMUNICATION_NODE); + + let mut client = Client::new( + identity, + vec![base_node.identity.to_peer()], + temp_dir_path, + Network::LocalNet, + ); + client.initialize().await; + + world.chat_clients.insert(name, client); +} + +#[when(expr = "I have a chat client {word} with no peers")] +async fn chat_client_with_no_peers(world: &mut TariWorld, name: String) { + let port = get_port(18000..18499).unwrap(); + let temp_dir_path = get_base_dir() + .join("chat_clients") + .join(format!("port_{}", port)) + .join(name.clone()); + let address = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{}", port)).unwrap(); + let identity = NodeIdentity::random(&mut OsRng, address, PeerFeatures::COMMUNICATION_NODE); + + let mut client = Client::new(identity, vec![], temp_dir_path, Network::LocalNet); + client.initialize().await; + + world.chat_clients.insert(name, client); +} + +#[when(regex = r"^I use (.+) to send a message '(.+)' to (.*)$")] +async fn send_message_to(world: &mut TariWorld, sender: String, message: String, receiver: String) { + let sender = world.chat_clients.get(&sender).unwrap(); + let receiver = world.chat_clients.get(&receiver).unwrap(); + let address = TariAddress::from_public_key(receiver.identity.public_key(), Network::LocalNet); + + sender.send_message(address, message).await; +} + +#[then(expr = "{word} will have {int} message(s) with {word}")] +async fn receive_n_messages(world: &mut TariWorld, receiver: String, message_count: u64, sender: String) { + let receiver: &Client = world.chat_clients.get(&receiver).unwrap(); + let sender = world.chat_clients.get(&sender).unwrap(); + let address = TariAddress::from_public_key(sender.identity.public_key(), Network::LocalNet); + + let mut messages = vec![]; + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + messages = receiver.get_messages(&address).await; + + if messages.len() as u64 == message_count { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!( + "Receiver {} only received {}/{} messages", + receiver.identity.node_id(), + messages.len(), + message_count + ) +} + +#[when(expr = "{word} adds {word} as a contact")] +async fn add_as_contact(world: &mut TariWorld, sender: String, receiver: String) { + let receiver: &Client = world.chat_clients.get(&receiver).unwrap(); + let sender: &Client = world.chat_clients.get(&sender).unwrap(); + + let address = TariAddress::from_public_key(receiver.identity.public_key(), Network::LocalNet); + + sender.add_contact(&address).await; +} + +#[when(expr = "{word} waits for contact {word} to be online")] +async fn wait_for_contact_to_be_online(world: &mut TariWorld, client: String, contact: String) { + let client: &Client = world.chat_clients.get(&client).unwrap(); + let contact: &Client = world.chat_clients.get(&contact).unwrap(); + + let address = TariAddress::from_public_key(contact.identity.public_key(), Network::LocalNet); + + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + if ContactOnlineStatus::Online == client.check_online_status(&address).await { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!("Contact {} never came online", contact.identity.node_id(),) +} + fn flush_stdout(buffer: &Arc>>) { // After each test we flush the stdout to the logs. info!( diff --git a/integration_tests/tests/features/Chat.feature b/integration_tests/tests/features/Chat.feature new file mode 100644 index 0000000000..f18d34031f --- /dev/null +++ b/integration_tests/tests/features/Chat.feature @@ -0,0 +1,50 @@ +# Copyright 2023 The Tari Project +# SPDX-License-Identifier: BSD-3-Clause + +Feature: Chat messaging + + Scenario: A message is propagated between nodes via 3rd party + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then CHAT_B will have 1 message with CHAT_A + + Scenario: A message is sent directly between nodes + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When CHAT_A adds CHAT_B as a contact + When I stop node SEED_A + When CHAT_A waits for contact CHAT_B to be online + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then CHAT_B will have 1 message with CHAT_A + + Scenario: Message counts are distinct + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I have a chat client CHAT_C connected to seed node SEED_A + + When CHAT_A adds CHAT_B as a contact + When CHAT_A adds CHAT_C as a contact + When CHAT_B adds CHAT_C as a contact + When CHAT_C adds CHAT_B as a contact + When I stop node SEED_A + + When I use CHAT_A to send a message 'Message 1 from a to b' to CHAT_B + When I use CHAT_A to send a message 'Message 2 from a to b' to CHAT_B + When I use CHAT_A to send a message 'Message 1 from a to c' to CHAT_C + + When I use CHAT_B to send a message 'Message 1 from b to c' to CHAT_C + When I use CHAT_B to send a message 'Message 2 from b to c' to CHAT_C + + When I use CHAT_C to send a message 'Message 1 from c to b' to CHAT_B + + Then CHAT_B will have 2 messages with CHAT_A + Then CHAT_B will have 3 messages with CHAT_C + Then CHAT_C will have 1 messages with CHAT_A + Then CHAT_C will have 3 messages with CHAT_B + Then CHAT_A will have 2 messages with CHAT_B + Then CHAT_A will have 1 messages with CHAT_C + diff --git a/integration_tests/tests/utils/base_node_process.rs b/integration_tests/tests/utils/base_node_process.rs index b96b0e56cc..930a3bd2bb 100644 --- a/integration_tests/tests/utils/base_node_process.rs +++ b/integration_tests/tests/utils/base_node_process.rs @@ -46,6 +46,7 @@ use crate::{ TariWorld, }; +#[derive(Clone)] pub struct BaseNodeProcess { pub name: String, pub port: u64, @@ -180,11 +181,7 @@ pub async fn spawn_base_node_with_config( .tcp .listener_address .clone()]); - // base_node_config.base_node.p2p.datastore_path = temp_dir_path.to_path_buf(); - // base_node_config.base_node.p2p.peer_database_name = "peer_db.mdb".to_string(); base_node_config.base_node.p2p.dht = DhtConfig::default_local_test(); - // base_node_config.base_node.p2p.dht.database_url = - // DbConnectionUrl::File(temp_dir_path.clone().join("dht.sqlite")); base_node_config.base_node.p2p.dht.network_discovery.enabled = true; base_node_config.base_node.p2p.allow_test_addresses = true; base_node_config.base_node.storage.orphan_storage_capacity = 10; diff --git a/integration_tests/tests/utils/chat_client/Cargo.toml b/integration_tests/tests/utils/chat_client/Cargo.toml new file mode 100644 index 0000000000..e3bf6ebbfc --- /dev/null +++ b/integration_tests/tests/utils/chat_client/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "tari_chat_client" +authors = ["The Tari Development Community"] +description = "Tari cucumber chat client" +license = "BSD-3-Clause" +version = "0.48.0-pre.1" +edition = "2018" + +[dependencies] +tari_common = { path = "../../../../common" } +tari_common_sqlite = { path = "../../../../common_sqlite" } +tari_common_types = { path = "../../../../base_layer/common_types" } +tari_comms = { path = "../../../../comms/core" } +tari_comms_dht = { path = "../../../../comms/dht" } +tari_contacts = { path = "../../../../base_layer/contacts" } +tari_p2p = { path = "../../../../base_layer/p2p" } +tari_service_framework= { path = "../../../../base_layer/service_framework" } +tari_shutdown = { path = "../../../../infrastructure/shutdown" } +tari_storage = { path = "../../../../infrastructure/storage" } +tari_test_utils = { path = "../../../../infrastructure/test_utils" } + +anyhow = "1.0.41" +diesel = { version = "2.0.3", features = ["sqlite", "r2d2", "serde_json", "chrono", "64-column-tables"] } +lmdb-zero = "0.4.4" diff --git a/integration_tests/tests/utils/chat_client/src/client.rs b/integration_tests/tests/utils/chat_client/src/client.rs new file mode 100644 index 0000000000..e4536673b8 --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/client.rs @@ -0,0 +1,171 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{ + fmt::{Debug, Formatter}, + path::PathBuf, + sync::Arc, + time::Duration, +}; + +use tari_common_types::tari_address::TariAddress; +use tari_comms::{peer_manager::Peer, CommsNode, NodeIdentity}; +use tari_contacts::contacts_service::{ + handle::ContactsServiceHandle, + service::ContactOnlineStatus, + types::{Message, MessageBuilder}, +}; +use tari_p2p::Network; +use tari_shutdown::Shutdown; + +use crate::{database, networking}; + +#[derive(Clone)] +pub struct Client { + pub base_dir: PathBuf, + pub contacts: Option, + pub identity: Arc, + pub network: Network, + pub seed_peers: Vec, + pub shutdown: Shutdown, +} + +impl Debug for Client { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Client") + .field("base_dir", &self.base_dir) + .field("identity", &self.identity) + .field("network", &self.network) + .field("seed_peers", &self.seed_peers) + .field("shutdown", &self.shutdown) + .finish() + } +} + +impl Drop for Client { + fn drop(&mut self) { + self.quit(); + } +} + +impl Client { + pub fn new(identity: NodeIdentity, seed_peers: Vec, base_dir: PathBuf, network: Network) -> Self { + Self { + identity: Arc::new(identity), + base_dir, + seed_peers, + shutdown: Shutdown::new(), + contacts: None, + network, + } + } + + pub async fn add_contact(&self, address: &TariAddress) { + if let Some(mut contacts_service) = self.contacts.clone() { + contacts_service + .upsert_contact(address.into()) + .await + .expect("Contact wasn't added"); + } + } + + pub async fn check_online_status(&self, address: &TariAddress) -> ContactOnlineStatus { + if let Some(mut contacts_service) = self.contacts.clone() { + let contact = contacts_service + .get_contact(address.clone()) + .await + .expect("Client does not have contact"); + + return contacts_service + .get_contact_online_status(contact) + .await + .expect("Failed to get status"); + } + + ContactOnlineStatus::Offline + } + + pub async fn send_message(&self, receiver: TariAddress, message: String) { + if let Some(mut contacts_service) = self.contacts.clone() { + contacts_service + .send_message(MessageBuilder::new().message(message).address(receiver).build()) + .await + .expect("Message wasn't sent"); + } + } + + pub async fn get_messages(&self, sender: &TariAddress) -> Vec { + let mut messages = vec![]; + if let Some(mut contacts_service) = self.contacts.clone() { + messages = contacts_service + .get_all_messages(sender.clone()) + .await + .expect("Messages not fetched"); + } + + messages + } + + pub async fn initialize(&mut self) { + println!("initializing chat"); + + let signal = self.shutdown.to_signal(); + let db = database::create_chat_storage(self.base_dir.clone()).unwrap(); + + let (contacts, comms_node) = networking::start( + self.identity.clone(), + self.base_dir.clone(), + self.seed_peers.clone(), + self.network, + db, + signal, + ) + .await + .unwrap(); + + if !self.seed_peers.is_empty() { + loop { + println!("Waiting for peer connections..."); + match wait_for_connectivity(comms_node.clone()).await { + Ok(_) => break, + Err(e) => println!("{}. Still waiting...", e), + } + } + } + + self.contacts = Some(contacts); + + println!("Connections established") + } + + pub fn quit(&mut self) { + self.shutdown.trigger(); + } +} + +pub async fn wait_for_connectivity(comms: CommsNode) -> anyhow::Result<()> { + comms + .connectivity() + .wait_for_connectivity(Duration::from_secs(30)) + .await?; + Ok(()) +} diff --git a/integration_tests/tests/utils/chat_client/src/database.rs b/integration_tests/tests/utils/chat_client/src/database.rs new file mode 100644 index 0000000000..3b2d0f8215 --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/database.rs @@ -0,0 +1,55 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{convert::TryInto, path::PathBuf}; + +use diesel::{Connection, SqliteConnection}; +use tari_common_sqlite::{ + connection::{DbConnection, DbConnectionUrl}, + error::StorageError, +}; +use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig}; +use tari_test_utils::random::string; + +pub fn create_chat_storage(base_path: PathBuf) -> Result { + std::fs::create_dir_all(&base_path).unwrap(); + let db_name = format!("{}.sqlite3", string(8).as_str()); + let db_path = format!("{}/{}", base_path.to_str().unwrap(), db_name); + let url: DbConnectionUrl = db_path.clone().try_into().unwrap(); + + // Create the db + let _db = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path)); + + DbConnection::connect_url(&url) +} + +pub fn create_peer_storage(base_path: PathBuf) { + std::fs::create_dir_all(&base_path).unwrap(); + + LMDBBuilder::new() + .set_path(&base_path) + .set_env_config(LMDBConfig::default()) + .set_max_number_of_databases(1) + .add_database("peerdb", lmdb_zero::db::CREATE) + .build() + .unwrap(); +} diff --git a/integration_tests/tests/utils/chat_client/src/lib.rs b/integration_tests/tests/utils/chat_client/src/lib.rs new file mode 100644 index 0000000000..561f08a5df --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/lib.rs @@ -0,0 +1,27 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod client; +pub use client::Client; + +pub mod database; +pub mod networking; diff --git a/integration_tests/tests/utils/chat_client/src/networking.rs b/integration_tests/tests/utils/chat_client/src/networking.rs new file mode 100644 index 0000000000..4dd0cb3640 --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/networking.rs @@ -0,0 +1,145 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{path::PathBuf, sync::Arc, time::Duration}; + +use tari_common::configuration::MultiaddrList; +use tari_common_sqlite::connection::DbConnection; +// Re-exports +pub use tari_comms::{ + multiaddr::Multiaddr, + peer_manager::{NodeIdentity, PeerFeatures}, +}; +use tari_comms::{peer_manager::Peer, CommsNode, UnspawnedCommsNode}; +use tari_comms_dht::{store_forward::SafConfig, DhtConfig, NetworkDiscoveryConfig}; +use tari_contacts::contacts_service::{ + handle::ContactsServiceHandle, + storage::sqlite_db::ContactsServiceSqliteDatabase, + ContactsServiceInitializer, +}; +use tari_p2p::{ + comms_connector::pubsub_connector, + initialization::{spawn_comms_using_transport, P2pInitializer}, + services::liveness::{LivenessConfig, LivenessInitializer}, + Network, + P2pConfig, + PeerSeedsConfig, + TcpTransportConfig, + TransportConfig, +}; +use tari_service_framework::StackBuilder; +use tari_shutdown::ShutdownSignal; + +use crate::database; + +pub async fn start( + node_identity: Arc, + base_path: PathBuf, + seed_peers: Vec, + network: Network, + db: DbConnection, + shutdown_signal: ShutdownSignal, +) -> anyhow::Result<(ContactsServiceHandle, CommsNode)> { + database::create_peer_storage(base_path.clone()); + let backend = ContactsServiceSqliteDatabase::init(db); + + let (publisher, subscription_factory) = pubsub_connector(100, 50); + let in_msg = Arc::new(subscription_factory); + + let transport_config = TransportConfig::new_tcp(TcpTransportConfig { + listener_address: node_identity.first_public_address(), + ..TcpTransportConfig::default() + }); + + let mut config = P2pConfig { + datastore_path: base_path.clone(), + dht: DhtConfig { + network_discovery: NetworkDiscoveryConfig { + enabled: true, + ..NetworkDiscoveryConfig::default() + }, + saf: SafConfig { + auto_request: true, + ..Default::default() + }, + ..DhtConfig::default_local_test() + }, + transport: transport_config.clone(), + allow_test_addresses: true, + public_addresses: MultiaddrList::from(vec![node_identity.first_public_address()]), + user_agent: "tari/chat-client/0.0.1".to_string(), + ..P2pConfig::default() + }; + config.set_base_path(base_path.clone()); + + let seed_config = PeerSeedsConfig { + peer_seeds: seed_peers + .iter() + .map(|p| format!("{}::{}", p.public_key, p.addresses.best().unwrap().address())) + .collect::>() + .into(), + ..PeerSeedsConfig::default() + }; + + let fut = StackBuilder::new(shutdown_signal) + .add_initializer(P2pInitializer::new( + config, + seed_config, + network, + node_identity, + publisher, + )) + .add_initializer(LivenessInitializer::new( + LivenessConfig { + auto_ping_interval: Some(Duration::from_secs(1)), + num_peers_per_round: 0, // No random peers + max_allowed_ping_failures: 0, // Peer with failed ping-pong will never be removed + ..Default::default() + }, + in_msg.clone(), + )) + .add_initializer(ContactsServiceInitializer::new( + backend, + in_msg, + Duration::from_secs(5), + 2, + )) + .build(); + + let mut handles = fut.await.expect("Service initialization failed"); + + let comms = handles + .take_handle::() + .expect("P2pInitializer was not added to the stack or did not add UnspawnedCommsNode"); + + let peer_manager = comms.peer_manager(); + for peer in seed_peers { + peer_manager.add_peer(peer).await?; + } + + let comms = spawn_comms_using_transport(comms, transport_config).await.unwrap(); + handles.register(comms); + + let comms = handles.expect_handle::(); + let contacts = handles.expect_handle::(); + Ok((contacts, comms)) +}