Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Noah-Wilderom committed Apr 29, 2024
1 parent 2ef78d0 commit d1a7ae7
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 67 deletions.

This file was deleted.

24 changes: 18 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package main

import (
"bytes"
"fmt"
"io"
"log"
"os"
"time"
Expand All @@ -22,7 +22,7 @@ func makeServer(listenAddr string, nodes ...string) *FileServer {
tcpTransport := p2p.NewTCPTransport(tcpTransportOpts)

fileServerOpts := FileServerOpts{
StorageRoot: listenAddr + "_network",
StorageRoot: listenAddr[1:] + "_network",
PathTransformFunc: CASPathTransformFunc,
Transport: tcpTransport,
BootstrapNodes: nodes,
Expand All @@ -48,13 +48,25 @@ func main() {
log.Fatal(s1.Start())
}()

time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)

go s2.Start()
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)

data := bytes.NewReader([]byte("my big data file here"))
s2.StoreData("myprivatekey", data)
// data := bytes.NewReader([]byte("my big data file here"))
// s2.Store("myprivatekey", data)

r, err := s2.Get("myprivatekey")
if err != nil {
log.Fatal(err)
}

b, err := io.ReadAll(r)
if err != nil {
log.Fatal(err)
}

fmt.Println(string(b))

select {}
}
192 changes: 140 additions & 52 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"sync"
"time"

"github.com/Noah-Wilderom/foreverstore/p2p"
)
Expand Down Expand Up @@ -42,7 +43,7 @@ func NewFileServer(opts FileServerOpts) *FileServer {
}
}

func (s *FileServer) broadcast(msg *Message) error {
func (s *FileServer) stream(msg *Message) error {
peers := []io.Writer{}
for _, peer := range s.peers {
peers = append(peers, peer)
Expand All @@ -52,53 +53,103 @@ func (s *FileServer) broadcast(msg *Message) error {
return gob.NewEncoder(mw).Encode(msg)
}

func (s *FileServer) broadcast(msg *Message) error {
buf := new(bytes.Buffer)
if err := gob.NewEncoder(buf).Encode(msg); err != nil {
return err
}

for _, peer := range s.peers {
if err := peer.Send(buf.Bytes()); err != nil {
return err
}
}

return nil
}

type Message struct {
Payload any
}

type MessageStoreFile struct {
Key string
Size int64
}

type MessageGetFile struct {
Key string
}

func (s *FileServer) StoreData(key string, r io.Reader) error {
// 1. Store this file to disk
// 2. broadcast this file to all known peers in the network
func (s *FileServer) Get(key string) (io.Reader, error) {
if s.store.Has(key) {
return s.store.Read(key)
}

fmt.Printf("dont have file (%s) locally, fetching from network...\n", key)

buf := new(bytes.Buffer)
msg := Message{
Payload: MessageStoreFile{
Payload: MessageGetFile{
Key: key,
},
}

if err := gob.NewEncoder(buf).Encode(msg); err != nil {
if err := s.broadcast(&msg); err != nil {
return nil, err
}

for _, peer := range s.peers {
fmt.Println("receiving stream from peer:", peer.RemoteAddr().String())
fileBuffer := new(bytes.Buffer)
n, err := io.Copy(fileBuffer, peer)
if err != nil {
return nil, err
}

fmt.Println("received bytes over the network:", n)
fmt.Println(fileBuffer.String())
}

select {}

return nil, nil
}

func (s *FileServer) Store(key string, r io.Reader) error {
var (
fileBuffer = new(bytes.Buffer)
tee = io.TeeReader(r, fileBuffer)
)

size, err := s.store.Write(key, tee)
if err != nil {
return err
}

msg := Message{
Payload: MessageStoreFile{
Key: key,
Size: size,
},
}

if err := s.broadcast(&msg); err != nil {
return err
}

time.Sleep(time.Second * 3)

// TODO: use a multiwriter here.
for _, peer := range s.peers {
if err := peer.Send(buf.Bytes()); err != nil {
n, err := io.Copy(peer, fileBuffer)
if err != nil {
return err
}

fmt.Println("received and written bytes to disk: ", n)
}

return nil

// buf := new(bytes.Buffer)
// tee := io.TeeReader(r, buf)
//
// if err := s.store.Write(key, tee); err != nil {
// return err
// }
//
// p := &DataMessage{
// Key: key,
// Data: buf.Bytes(),
// }
//
// return s.broadcast(&Message{
// From: "todo",
// Payload: p,
// })
}

func (s *FileServer) Stop() {
Expand All @@ -118,7 +169,7 @@ func (s *FileServer) OnPeer(p p2p.Peer) error {

func (s *FileServer) loop() {
defer func() {
log.Println("file server stopped due to user quit action")
log.Println("file server stopped due to error or user quit action")
s.Transport.Close()
}()

Expand All @@ -127,42 +178,74 @@ func (s *FileServer) loop() {
case rpc := <-s.Transport.Consume():
var msg Message
if err := gob.NewDecoder(bytes.NewReader(rpc.Payload)).Decode(&msg); err != nil {
log.Println(err)
log.Println("decoding error:", err)
}

fmt.Printf("payload: %+v\n", msg.Payload)

peer, ok := s.peers[rpc.From]
if !ok {
panic("peer not found in peer map")
if err := s.handleMessage(rpc.From, &msg); err != nil {
log.Println("handle message error:", err)
}
case <-s.quitch:
return
}
}
}

buf := make([]byte, 1000)
if _, err := peer.Read(buf); err != nil {
panic(err)
}
func (s *FileServer) handleMessage(from string, msg *Message) error {
switch v := msg.Payload.(type) {
case MessageStoreFile:
return s.handleMessageStoreFile(from, v)
case MessageGetFile:
return s.handleMessageGetFile(from, v)
}

fmt.Printf("%s\n", string(buf))
return nil
}

peer.(*p2p.TCPPeer).Wg.Done()
func (s *FileServer) handleMessageGetFile(from string, msg MessageGetFile) error {
if !s.store.Has(msg.Key) {
return fmt.Errorf("need to serve file (%s) but it does not exists on disk", msg.Key)
}

// if err := s.handleMessage(&m); err != nil {
// log.Println(err)
//}
case <-s.quitch:
return
}
fmt.Printf("serving file (%s) over the network\n", msg.Key)
r, err := s.store.Read(msg.Key)
if err != nil {
return err
}

peer, ok := s.peers[from]

if !ok {
return fmt.Errorf("peer (%s) could not be found in the peer list", from)
}

n, err := io.Copy(peer, r)
if err != nil {
return err
}

fmt.Printf("written (%d) bytes over the network to %s\n", n, from)

return nil
}

//func (s *FileServer) handleMessage(msg *Message) error {
// switch v := msg.Payload.(type) {
// case *DataMessage:
// fmt.Printf("received data %+v\n", v)
// }
//
// return nil
//}
func (s *FileServer) handleMessageStoreFile(from string, msg MessageStoreFile) error {
peer, ok := s.peers[from]

if !ok {
return fmt.Errorf("peer (%s) could not be found in the peer list", from)
}

n, err := s.store.Write(msg.Key, io.LimitReader(peer, msg.Size))
if err != nil {
return err
}

log.Printf("written (%d) bytes to disk\n", n)

peer.(*p2p.TCPPeer).Wg.Done()

return nil
}

func (s *FileServer) bootstrapNetwork() error {
for _, addr := range s.BootstrapNodes {
Expand Down Expand Up @@ -192,3 +275,8 @@ func (s *FileServer) Start() error {

return nil
}

func init() {
gob.Register(MessageStoreFile{})
gob.Register(MessageGetFile{})
}
14 changes: 6 additions & 8 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *Store) Delete(key string) error {
return os.RemoveAll(firstPathNameWithRoot)
}

func (s *Store) Write(key string, r io.Reader) error {
func (s *Store) Write(key string, r io.Reader) (int64, error) {
return s.writeStream(key, r)
}

Expand All @@ -141,26 +141,24 @@ func (s *Store) readStream(key string) (io.ReadCloser, error) {
return os.Open(fullPathWithRoot)
}

func (s *Store) writeStream(key string, r io.Reader) error {
func (s *Store) writeStream(key string, r io.Reader) (int64, error) {
pathKey := s.PathTransformFunc(key)
pathNameWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.PathName)
if err := os.MkdirAll(pathNameWithRoot, os.ModePerm); err != nil {
return err
return 0, err
}

fullPathWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.FullPath())

f, err := os.Create(fullPathWithRoot)
if err != nil {
return err
return 0, err
}

n, err := io.Copy(f, r)
if err != nil {
return err
return 0, err
}

log.Printf("written (%d) bytes to disk: %s", n, fullPathWithRoot)

return nil
return n, nil
}

0 comments on commit d1a7ae7

Please sign in to comment.