Skip to content

Commit

Permalink
Read shard (#30)
Browse files Browse the repository at this point in the history
* Updated announce msg and announce functionality

* fix tests

* Create test.yaml

* Update test.yaml

* Update test.yaml

* Update test.yaml

* Create autoformat.yaml

* test diffs

* sample diff

* Auto Format Changes

* force Ipv6 strict typing everywhere (#26)

* use ipv6 container

* use ipv6 everywhere

* Auto Format Changes

---------

Co-authored-by: Format Bot <>

* Start Info Server (#27)

* implement announce shard request

* start info server

* cleanup

* fix compile

* finish info

* Auto Format Changes

---------

Co-authored-by: Format Bot <>

* Add some Info server tests (#28)

* implement test_client

* test fails

* Auto Format Changes

* fix tests

* finish info test

* Auto Format Changes

---------

Co-authored-by: Format Bot <>

* Using sockAddV6

* Added announcement for write_shard

* updated announcement for write_shard

---------

Co-authored-by: Daniel Sudzilouski <[email protected]>
Co-authored-by: Format Bot <>
  • Loading branch information
Rixhee and hazel-sudz authored Dec 14, 2024
1 parent f361f87 commit e98cd9a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ async fn main() -> Result<()> {

#[cfg(test)]
mod tests {
use messages::requests::announce_shard_request::AnnounceMessageType;
use serial_test::serial;
use utils::test_client::{self, TestRouterClient};

Expand Down Expand Up @@ -245,6 +246,7 @@ mod tests {
.queue_request(
AnnounceShardRequest {
shard_type: ShardType::WriteShard,
message_type: AnnounceMessageType::NewAnnounce as u8,
ip: i,
port: i as u16,
},
Expand All @@ -258,6 +260,7 @@ mod tests {
.queue_request(
AnnounceShardRequest {
shard_type: ShardType::ReadShard,
message_type: AnnounceMessageType::NewAnnounce as u8,
ip: (j + 1) * 100,
port: ((j + 1) * 100) as u16,
},
Expand Down
22 changes: 20 additions & 2 deletions src/messages/requests/announce_shard_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@ pub enum ShardType {
WriteShard = 1,
}

#[repr(u8)]
#[derive(Debug, Clone, Copy, IntEnum, PartialEq, Eq)]
pub enum AnnounceMessageType {
NewAnnounce = 0,
ReAnnounce = 1,
}

pub struct AnnounceShardRequest {
pub shard_type: ShardType,
pub message_type: u8,
pub ip: u128,
pub port: u16,
}

/// Layout of the AnnounceShardRequest
/// | 1 byte | 16 bytes | 2 bytes |
/// | Shard Type | IP | port |
/// | 1 byte | 1 byte | 16 bytes | 2 bytes |
/// | Shard Type | message type | IP | port |
impl MessagePayload for AnnounceShardRequest {
fn is_request(&self) -> bool {
true
Expand All @@ -32,6 +40,9 @@ impl MessagePayload for AnnounceShardRequest {
// Add shard type (1 byte)
buffer.push(self.shard_type.into());

// Add message type (1 byte)
buffer.push(self.message_type);

// Add IP (16 bytes)
buffer.extend_from_slice(&self.ip.to_le_bytes());

Expand All @@ -47,6 +58,10 @@ impl MessagePayload for AnnounceShardRequest {
let shard_type = ShardType::try_from(buffer[offset]).unwrap();
offset += 1;

// Read message type (1 byte)
let message_type = buffer[offset];
offset += 1;

// Read IP (16 bytes)
let ip = u128::from_le_bytes(
<[u8; 16]>::try_from(&buffer[offset..offset + 16]).context("failed to get IP bytes")?,
Expand All @@ -60,6 +75,7 @@ impl MessagePayload for AnnounceShardRequest {

Ok(AnnounceShardRequest {
shard_type,
message_type,
ip,
port,
})
Expand All @@ -74,6 +90,7 @@ mod tests {
#[test]
fn test_roundtrip_basic() {
let original = AnnounceShardRequest {
message_type: AnnounceMessageType::NewAnnounce as u8,
shard_type: ShardType::WriteShard,
ip: 1,
port: 8080,
Expand All @@ -92,6 +109,7 @@ mod tests {
let ip: u128 = rng.gen();
let port: u16 = rng.gen();
let original = AnnounceShardRequest {
message_type: AnnounceMessageType::NewAnnounce as u8,
shard_type: ShardType::WriteShard,
ip,
port,
Expand Down
27 changes: 23 additions & 4 deletions src/read_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ use rand::Rng;
use std::collections::HashMap;
use std::net::{Ipv6Addr, SocketAddrV6};
use std::sync::{Arc, Mutex};
use tokio::net::unix::SocketAddr;
use tokio::time;
mod io;
mod messages;
use crate::messages::{
requests::{
announce_shard_request::AnnounceShardRequest,
get_shared_peers_request::GetSharedPeersRequest, get_version_request::GetVersionRequest,
query_version_request::QueryVersionRequest, read_request::ReadRequest,
announce_shard_request::{AnnounceMessageType, AnnounceShardRequest},
get_shared_peers_request::GetSharedPeersRequest,
get_version_request::GetVersionRequest,
query_version_request::QueryVersionRequest,
read_request::ReadRequest,
},
responses::{
announce_shard_response::AnnounceShardResponse,
Expand Down Expand Up @@ -181,6 +182,23 @@ async fn main() -> Result<()> {
let read_shard_router = read_shard_server.get_handler_arc();
let reader_ip_port = read_shard_server.bind().await?;

let client0 = read_shard_server.get_router_client();
tokio::spawn(async move {
let announce_request = AnnounceShardRequest {
shard_type: ShardType::ReadShard,
message_type: AnnounceMessageType::NewAnnounce as u8,
ip: reader_ip_port.ip().to_bits(),
port: reader_ip_port.port(),
};

if let Err(e) = client0
.queue_request::<AnnounceShardRequest>(announce_request, MAIN_INSTANCE_IP_PORT)
.await
{
eprintln!("Failed to send AnnounceShardRequest: {:?}", e);
}
});

let client1 = read_shard_server.get_router_client();
tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(3));
Expand All @@ -189,6 +207,7 @@ async fn main() -> Result<()> {

let announce_request = AnnounceShardRequest {
shard_type: ShardType::ReadShard,
message_type: AnnounceMessageType::ReAnnounce as u8,
ip: reader_ip_port.ip().to_bits(),
port: reader_ip_port.port(),
};
Expand Down
61 changes: 55 additions & 6 deletions src/write_shard.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use anyhow::Result;
use io::write;
use messages::requests::announce_shard_request::{AnnounceMessageType, ShardType};
use std::collections::HashMap;
use std::net::{Ipv6Addr, SocketAddrV6};
use std::sync::{Arc, Mutex};
use tokio::time;
mod messages;
use crate::messages::{
requests::{
Expand All @@ -23,6 +27,8 @@ use crate::messages::{
mod io;
use io::router::{RouterBuilder, RouterHandler};

static MAIN_INSTANCE_IP_PORT: SocketAddrV6 = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0);

#[derive(Debug)]
struct WriteShard {
data: Arc<Mutex<HashMap<String, String>>>,
Expand Down Expand Up @@ -156,11 +162,54 @@ impl RouterHandler for WriteShard {

#[tokio::main]
async fn main() -> Result<()> {
let info_router = WriteShard::new();
let mut info_server = RouterBuilder::new(info_router, None);
let write_shard_router = WriteShard::new();
let mut write_shard_server = RouterBuilder::new(write_shard_router, None);
let writer_ip_port = write_shard_server.bind().await?;

let client0 = write_shard_server.get_router_client();
tokio::spawn(async move {
info_server.listen().await?;
Ok(())
})
.await?
let announce_request = AnnounceShardRequest {
shard_type: ShardType::WriteShard,
message_type: AnnounceMessageType::NewAnnounce as u8,
ip: writer_ip_port.ip().to_bits(),
port: writer_ip_port.port(),
};

if let Err(e) = client0
.queue_request::<AnnounceShardRequest>(announce_request, MAIN_INSTANCE_IP_PORT)
.await
{
eprintln!("Failed to send AnnounceShardRequest: {:?}", e);
}
});

let client1 = write_shard_server.get_router_client();
tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(3));
loop {
interval.tick().await;

let announce_request = AnnounceShardRequest {
shard_type: ShardType::WriteShard,
message_type: AnnounceMessageType::ReAnnounce as u8,
ip: writer_ip_port.ip().to_bits(),
port: writer_ip_port.port(),
};

if let Err(e) = client1
.queue_request::<AnnounceShardRequest>(announce_request, MAIN_INSTANCE_IP_PORT)
.await
{
eprintln!("Failed to send AnnounceShardRequest: {:?}", e);
}
}
});

tokio::spawn(async move {
if let Err(e) = write_shard_server.listen().await {
eprintln!("Server failed: {:?}", e);
}
});

Ok(())
}

0 comments on commit e98cd9a

Please sign in to comment.