This repository has been archived by the owner on Mar 1, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
Minimal separate process downloading blocks from Byron node #347
Open
paweljakubas
wants to merge
1
commit into
develop
Choose a base branch
from
paweljakubas/330/adding-lean-decoupled-block-listener
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
{-# LANGUAGE DeriveDataTypeable #-} | ||
{-# LANGUAGE DeriveGeneric #-} | ||
{-# LANGUAGE FlexibleInstances #-} | ||
{-# LANGUAGE GADTs #-} | ||
{-# LANGUAGE MultiParamTypeClasses #-} | ||
{-# LANGUAGE OverloadedStrings #-} | ||
{-# LANGUAGE RankNTypes #-} | ||
{-# LANGUAGE RecursiveDo #-} | ||
{-# LANGUAGE ScopedTypeVariables #-} | ||
{-# LANGUAGE StandaloneDeriving #-} | ||
{-# LANGUAGE TypeApplications #-} | ||
{-# LANGUAGE TypeFamilies #-} | ||
|
||
module Main where | ||
|
||
import Control.Concurrent (forkIO, killThread, threadDelay) | ||
import Control.Concurrent.Async (forConcurrently) | ||
import Control.Exception (throwIO) | ||
import qualified Data.ByteString.Char8 as BS | ||
import Data.Functor.Contravariant (contramap) | ||
import qualified Data.List as L | ||
import Data.List.NonEmpty (NonEmpty ((:|))) | ||
import Network.Transport (EndPointAddress (..), closeTransport) | ||
import qualified Network.Transport.TCP as TCP | ||
import Prelude (getChar) | ||
import System.Random | ||
import Universum | ||
|
||
import ByronSpecifics (MsgBlock (..), MsgGetBlocks (..), | ||
MsgGetHeaders (..), MsgHeaders (..), securityParameter, | ||
usedVerInfo) | ||
|
||
import Node | ||
import Pos.Chain.Block (Block, HeaderHash, getBlockHeader, headerHash, | ||
prevBlockL) | ||
import Pos.Core.Chrono (NewestFirst (..)) | ||
import Pos.Infra.Communication.BiP (BiP, bipPacking) | ||
import Pos.Infra.Communication.Types.Protocol (PeerData) | ||
import Pos.Util.Trace (stdoutTrace) | ||
|
||
type Packing = BiP | ||
|
||
data HeaderData = HeaderData { | ||
headerHashToBeProcessed :: Maybe HeaderHash | ||
, headerHashesConsumed :: [HeaderHash] | ||
} deriving (Show, Eq) | ||
|
||
-- the wallet is not going to be redirecting information, just being the consumer in a converse with the node | ||
noListener | ||
:: NodeId | ||
-> PeerData | ||
-> [Listener Packing PeerData] | ||
noListener _ _ = [] | ||
|
||
|
||
queryWorker | ||
:: NodeId | ||
-> [NodeId] | ||
-> IORef HeaderData | ||
-> Converse Packing PeerData | ||
-> IO () | ||
queryWorker anId peerIds headerData = syncLoop | ||
where | ||
syncLoop | ||
:: Converse Packing PeerData | ||
-> IO () | ||
syncLoop converse = loop | ||
where | ||
loop :: IO () | ||
loop = do | ||
threadDelay 3000000 | ||
let getCurrentHeader | ||
:: NodeId | ||
-> ConversationActions MsgGetHeaders MsgHeaders | ||
-> IO () | ||
getCurrentHeader peerId cactions = do | ||
send cactions (MsgGetHeaders [] Nothing) | ||
received <- recv cactions maxBound | ||
case received of | ||
Just (MsgHeaders (NewestFirst (tip:|[]))) -> do | ||
let newHeaderHash = headerHash tip | ||
(HeaderData _ headerConsumed) <- readIORef headerData | ||
writeIORef headerData $ HeaderData (Just newHeaderHash) headerConsumed | ||
Just text -> putTextLn $ show anId <> " no headers from " <> show peerId <> " data:" <> show text | ||
Nothing -> error "getCurrentHeader Unexpected end of input" | ||
getBlocks | ||
:: NodeId | ||
-> ConversationActions MsgGetBlocks MsgBlock | ||
-> IO () | ||
getBlocks peerId cactions = do | ||
(HeaderData headerM pulledBlockHeaders) <- readIORef headerData | ||
case headerM of | ||
Nothing -> return () | ||
Just headerTip -> do | ||
case headerTip `L.elem` pulledBlockHeaders of | ||
True -> return () | ||
False -> do | ||
blocks <- getBlock peerId cactions headerTip pulledBlockHeaders [] | ||
let headersToAdd = map (headerHash . getBlockHeader) blocks | ||
writeIORef headerData $ HeaderData (Just headerTip) (L.take securityParameter $ headersToAdd ++ pulledBlockHeaders) | ||
getBlock | ||
:: NodeId | ||
-> ConversationActions MsgGetBlocks MsgBlock | ||
-> HeaderHash | ||
-> [HeaderHash] | ||
-> [Block] | ||
-> IO [Block] | ||
getBlock peerId cactions currentHeader consumedHeaders blocks = do | ||
send cactions (MsgGetBlocks currentHeader currentHeader) | ||
received <- recv cactions maxBound | ||
case received of | ||
Just (MsgBlock block@(Right _)) -> do | ||
-- main block | ||
putTextLn "downloaded main block from trusted node :" | ||
putTextLn $ show block | ||
let previousHeader = block ^. prevBlockL | ||
case ((previousHeader `L.elem` consumedHeaders) || (L.length consumedHeaders ==0)) of | ||
True -> | ||
pure $ block : blocks | ||
False -> | ||
getBlock peerId cactions previousHeader consumedHeaders (block : blocks) | ||
Just (MsgBlock block@(Left _)) -> do | ||
-- genesis block | ||
putTextLn "downloaded genesis block from trusted node" | ||
let previousHeader = block ^. prevBlockL | ||
case ((previousHeader `L.elem` consumedHeaders) || (L.length consumedHeaders ==0)) of | ||
True -> | ||
pure $ blocks | ||
False -> | ||
getBlock peerId cactions previousHeader consumedHeaders blocks | ||
Just (MsgNoBlock txt) -> do | ||
putTextLn $ show txt | ||
pure blocks | ||
_ -> | ||
pure blocks | ||
_ <- forConcurrently peerIds $ \peerId -> do | ||
converseWith converse peerId (\_ -> Conversation (getCurrentHeader peerId)) | ||
converseWith converse peerId (\_ -> Conversation (getBlocks peerId)) | ||
loop | ||
|
||
|
||
main :: IO () | ||
main = do | ||
|
||
let params = TCP.defaultTCPParameters { TCP.tcpCheckPeerHost = True } | ||
transport <- do | ||
transportOrError <- | ||
TCP.createTransport (TCP.defaultTCPAddr "127.0.0.1" "3005") params | ||
either throwIO return transportOrError | ||
|
||
let prng1 = mkStdGen 0 | ||
|
||
let convEstablishTimeout = 30000000 | ||
let nodeEnv = defaultNodeEnvironment { nodeAckTimeout = convEstablishTimeout } | ||
|
||
-- this is the trusted node we are going to talk with | ||
let peerNodeId = NodeId $ EndPointAddress $ BS.pack "127.0.0.1:3001:0" | ||
|
||
-- later it would be read from persisted storage | ||
initialHeaderData <- newIORef $ HeaderData Nothing [] | ||
|
||
putTextLn "Starting decoupled block listener" | ||
node (contramap snd stdoutTrace) (simpleNodeEndPoint transport) (const noReceiveDelay) (const noReceiveDelay) | ||
prng1 bipPacking usedVerInfo nodeEnv $ \theNode -> | ||
NodeAction (noListener . nodeId $ theNode) $ \converse -> do | ||
tid1 <- forkIO $ queryWorker (nodeId theNode) [peerNodeId] initialHeaderData converse | ||
putTextLn "Press return to stop" | ||
_ <- getChar | ||
killThread tid1 | ||
putTextLn "Stopping requestTipWorker" | ||
|
||
|
||
--we will need to persist that | ||
finalHeaderHashes <- readIORef initialHeaderData | ||
putTextLn $ show finalHeaderHashes | ||
|
||
putTextLn "Decoupled block listener stopped." | ||
closeTransport transport |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
module ByronSpecifics where | ||
|
||
import qualified Data.HashMap.Strict as HM | ||
import Formatting (bprint, build, (%)) | ||
import qualified Formatting.Buildable | ||
import Serokell.Util.Text (listJson) | ||
import Universum | ||
|
||
import Node.Message.Class (Message (..)) | ||
import Pos.Binary.Class (Bi (..), Cons (..), Field (..), | ||
deriveSimpleBi, encodeListLen, enforceSize) | ||
import Pos.Chain.Block (Block, BlockHeader (..), HeaderHash) | ||
import Pos.Chain.Update (BlockVersion (..)) | ||
import Pos.Core.Chrono (NE, NewestFirst (..)) | ||
import Pos.Infra.Communication.Types.Protocol (HandlerSpec (..), | ||
HandlerSpecs, VerInfo (..)) | ||
import Pos.Util.Util (cborError) | ||
|
||
|
||
data MsgGetHeaders = MsgGetHeaders | ||
{ -- not guaranteed to be in any particular order | ||
mghFrom :: ![HeaderHash] | ||
, mghTo :: !(Maybe HeaderHash) | ||
} deriving (Generic, Show, Eq) | ||
|
||
instance Buildable MsgGetHeaders where | ||
build (MsgGetHeaders mghFrom' mghTo') = | ||
bprint ("MsgGetHeaders {from = "%listJson%", to = "%build%"}") | ||
mghFrom' (maybe "<Nothing>" (bprint build) mghTo') | ||
|
||
instance Message MsgGetHeaders where | ||
messageCode _ = 4 | ||
formatMessage _ = "GetHeaders" | ||
|
||
deriveSimpleBi ''MsgGetHeaders [ | ||
Cons 'MsgGetHeaders [ | ||
Field [| mghFrom :: [HeaderHash] |], | ||
Field [| mghTo :: Maybe HeaderHash |] | ||
]] | ||
|
||
-- | 'Headers' message (see protocol specification). | ||
data MsgHeaders | ||
= MsgHeaders (NewestFirst NE BlockHeader) | ||
| MsgNoHeaders Text | ||
deriving (Eq, Show, Generic) | ||
|
||
instance Bi MsgHeaders where | ||
encode = \case | ||
MsgHeaders b -> encodeListLen 2 <> encode (0 :: Word8) <> encode b | ||
MsgNoHeaders t -> encodeListLen 2 <> encode (1 :: Word8) <> encode t | ||
decode = do | ||
enforceSize "MsgHeaders" 2 | ||
tag <- decode @Word8 | ||
case tag of | ||
0 -> MsgHeaders <$> decode | ||
1 -> MsgNoHeaders <$> decode | ||
t -> cborError $ "MsgHeaders wrong tag: " <> Universum.show t | ||
|
||
instance Message MsgHeaders where | ||
messageCode _ = 5 | ||
formatMessage _ = "BlockHeaders" | ||
|
||
|
||
-- | 'GetBlocks' message (see protocol specification). | ||
data MsgGetBlocks = MsgGetBlocks | ||
{ mgbFrom :: !HeaderHash | ||
, mgbTo :: !HeaderHash | ||
} deriving (Generic, Show, Eq) | ||
|
||
instance Buildable MsgGetBlocks where | ||
build (MsgGetBlocks mgbFrom' mgbTo') = | ||
bprint ("MsgGetBlocks {from = "%build%", to = "%build%"}") | ||
mgbFrom' mgbTo' | ||
|
||
instance Message MsgGetBlocks where | ||
messageCode _ = 6 | ||
formatMessage _ = "GetBlocks" | ||
|
||
deriveSimpleBi ''MsgGetBlocks [ | ||
Cons 'MsgGetBlocks [ | ||
Field [| mgbFrom :: HeaderHash |], | ||
Field [| mgbTo :: HeaderHash |] | ||
]] | ||
|
||
-- | 'Block' message (see protocol specification). | ||
data MsgBlock | ||
= MsgBlock Block | ||
| MsgNoBlock Text | ||
deriving (Eq, Show, Generic) | ||
|
||
instance Bi MsgBlock where | ||
encode = \case | ||
MsgBlock b -> encodeListLen 2 <> encode (0 :: Word8) <> encode b | ||
MsgNoBlock t -> encodeListLen 2 <> encode (1 :: Word8) <> encode t | ||
decode = do | ||
enforceSize "MsgBlock" 2 | ||
tag <- decode @Word8 | ||
case tag of | ||
0 -> MsgBlock <$> decode | ||
1 -> MsgNoBlock <$> decode | ||
t -> cborError $ "MsgBlock wrong tag: " <> Universum.show t | ||
|
||
instance Message MsgBlock where | ||
messageCode _ = 7 | ||
formatMessage _ = "Block" | ||
|
||
|
||
|
||
blockVersion :: BlockVersion | ||
blockVersion = BlockVersion | ||
{ bvMajor = 0 | ||
, bvMinor = 0 | ||
, bvAlt = 0 | ||
} | ||
|
||
protocolMagic :: Int32 | ||
protocolMagic = 55550001 | ||
|
||
securityParameter :: Int | ||
securityParameter = 2160 | ||
|
||
ins :: HandlerSpecs | ||
ins = HM.fromList [(96,ConvHandler 67),(49,ConvHandler 92),(97,ConvHandler 61),(98,ConvHandler 55),(67,ConvHandler 96),(83,ConvHandler 0),(4,ConvHandler 5),(5,ConvHandler 4),(37,ConvHandler 94),(6,ConvHandler 7),(55,ConvHandler 98),(73,ConvHandler 95),(43,ConvHandler 93),(92,ConvHandler 49),(13,ConvHandler 0),(61,ConvHandler 97),(93,ConvHandler 43),(14,ConvHandler 0),(94,ConvHandler 37),(15,ConvHandler 16),(95,ConvHandler 73)] | ||
|
||
outs :: HandlerSpecs | ||
outs = HM.fromList [(49,ConvHandler 92),(67,ConvHandler 96),(83,ConvHandler 0),(4,ConvHandler 5),(5,ConvHandler 4),(37,ConvHandler 94),(6,ConvHandler 7),(55,ConvHandler 98),(73,ConvHandler 95),(43,ConvHandler 93),(13,ConvHandler 0),(61,ConvHandler 97),(14,ConvHandler 0),(15,ConvHandler 16)] | ||
|
||
usedVerInfo :: VerInfo | ||
usedVerInfo = VerInfo protocolMagic blockVersion ins outs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Decoupling cardano wallet | ||
|
||
**Executive summary** | ||
*The minimum steps needed to run separate process downloading blocks from Byron era cardano node* | ||
|
||
|
||
In the current prototype it is shown that bare networking node instance is needed in order to request current tip of the block and download corresponding block. | ||
All Byron specific configuration required to connect to the node are located in **ByronSpecifics.hs**. | ||
In order to build the process run : | ||
|
||
``` sh | ||
$ cd cardano-wallet | ||
$ stack build cardano-wallet:decoupled-wallet | ||
``` | ||
|
||
Then run cluster in another console via | ||
|
||
``` sh | ||
$ cd cardano-sl | ||
$ stack test cardano-wallet:test:integration | ||
``` | ||
|
||
The process instantiate at *127.0.0.1:3005* and tries to talk to *127.0.0.1:3001:0* (which is one of the cluster's node) | ||
In order to instantiate the process run : | ||
``` sh | ||
$ cd cardano-wallet | ||
$ stack exec -- decoupled-wallet | ||
``` |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am having a look at the PR now and will push some refactorings changes on the branch.
Note that, you've been cheating my friend 😛
The requirements said:
And... I see quite a lot of
cardano-sl-***
dependencies here 🙃 ...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I used cardano-sl-networking node and converse primitives. They use quite sophisticated way of communication there...Other dependencies are just infra (BiP, bipPacking, PeerData, VerInfo - required to talk with the node), of course chain (Block, BlockHeader, etc for definitions), core (for NewestFirst) .
So I faced cardano-dependences vs simplicity trade-off. For me not using the abovementioned would mean very complicated, not easy to understand and error-prone code. cardano-sl-networking use not so easy, I would say cryptic, bidirectional communication protocol...
Nevertheless, I agree - in this version there are still dependencies. But it is simple on the other way