Skip to content

Commit

Permalink
Introduce a projection view for ChainStateAt
Browse files Browse the repository at this point in the history
- This removes the dependency on HeadLogic in the Chain module.
  • Loading branch information
ffakenz committed Jul 10, 2023
1 parent 8123673 commit a0d5f74
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 48 deletions.
7 changes: 4 additions & 3 deletions hydra-cluster/test/Test/DirectChainSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import Hydra.Cluster.Fixture (
import Hydra.Cluster.Util (chainConfigFor, keysFor)
import Hydra.ContestationPeriod (ContestationPeriod)
import Hydra.Crypto (aggregate, sign)
import Hydra.HeadLogic (HeadStateEvent)
import Hydra.HeadLogic (HeadStateEvent, toChainStateEvents)
import Hydra.Ledger (IsTx (..))
import Hydra.Ledger.Cardano (Tx, genOneUTxOFor)
import Hydra.Logging (Tracer, nullTracer, showLogsOnFailure)
Expand All @@ -70,7 +70,7 @@ import Hydra.Options (
toArgNetworkId,
)
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncrementalView)
import Hydra.Snapshot (ConfirmedSnapshot (..), Snapshot (..))
import System.Process (proc, readCreateProcess)
import Test.QuickCheck (generate)
Expand Down Expand Up @@ -451,7 +451,8 @@ withDirectChainTest tracer config ctx action = do
atomically $ modifyTVar events (event :)
, loadAll = readTVarIO events
}
withDirectChain tracer config ctx wallet persistence initialChainState callback $ \Chain{postTx} -> do
let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
withDirectChain tracer config ctx wallet persistenceView initialChainState callback $ \Chain{postTx} -> do
action
DirectChainTest
{ postTx
Expand Down
7 changes: 4 additions & 3 deletions hydra-node/exe/hydra-node/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import Hydra.HeadLogic (
OpenState (..),
defaultTTL,
getChainState,
updateHeadState,
updateHeadState, toChainStateEvents
)
import qualified Hydra.Ledger.Cardano as Ledger
import Hydra.Ledger.Cardano.Configuration (
Expand Down Expand Up @@ -59,7 +59,7 @@ import Hydra.Options (
)
import Hydra.Persistence (
PersistenceIncremental (loadAll),
createPersistenceIncremental,
createPersistenceIncremental, createPersistenceIncrementalView,
)

newtype ParamMismatchError = ParamMismatchError String deriving (Eq, Show)
Expand Down Expand Up @@ -109,7 +109,8 @@ main = do
nodeState <- createNodeState hs
ctx <- loadChainContext chainConfig party otherParties hydraScriptsTxId
wallet <- mkTinyWallet (contramap DirectChain tracer) chainConfig
withDirectChain (contramap DirectChain tracer) chainConfig ctx wallet persistence (getChainState hs) (putEvent . OnChainEvent) $ \chain -> do
let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
withDirectChain (contramap DirectChain tracer) chainConfig ctx wallet persistenceView (getChainState hs) (putEvent . OnChainEvent) $ \chain -> do
let RunOptions{host, port, peers, nodeId} = opts
putNetworkEvent (Authenticated msg otherParty) = putEvent $ NetworkEvent defaultTTL otherParty msg
RunOptions{apiHost, apiPort} = opts
Expand Down
9 changes: 4 additions & 5 deletions hydra-node/src/Hydra/Chain/Direct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ import Hydra.Chain.Direct.Wallet (
WalletInfoOnChain (..),
newTinyWallet,
)
import Hydra.HeadLogic (HeadStateEvent)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Options (ChainConfig (..))
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental)
import Hydra.Persistence (PersistenceIncrementalView)
import qualified Ouroboros.Consensus.HardFork.History as Consensus
import Ouroboros.Network.Magic (NetworkMagic (..))
import Ouroboros.Network.NodeToClient (
Expand Down Expand Up @@ -186,11 +185,11 @@ withDirectChain ::
ChainConfig ->
ChainContext ->
TinyWallet IO ->
PersistenceIncremental (HeadStateEvent Tx) IO ->
PersistenceIncrementalView ChainStateAt IO ->
-- | Last known chain state as loaded from persistence.
ChainStateAt ->
ChainComponent Tx IO a
withDirectChain tracer config ctx wallet persistence chainStateAt callback action = do
withDirectChain tracer config ctx wallet persistenceView chainStateAt callback action = do
-- Last known point on chain as loaded from persistence.
let persistedPoint = recordedAt chainStateAt
queue <- newTQueueIO
Expand All @@ -212,7 +211,7 @@ withDirectChain tracer config ctx wallet persistence chainStateAt callback actio
localChainState
(submitTx queue)

let handler = chainSyncHandler tracer callback getTimeHandle ctx localChainState persistence
let handler = chainSyncHandler tracer callback getTimeHandle ctx localChainState persistenceView
res <-
race
( handle onIOException $
Expand Down
31 changes: 8 additions & 23 deletions hydra-node/src/Hydra/Chain/Direct/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-unused-imports #-}

-- | Provide infrastructure-independent "handlers" for posting transactions and following the chain.
--
Expand Down Expand Up @@ -32,7 +33,7 @@ import Hydra.Chain (
ChainEvent (..),
ChainStateType,
PostChainTx (..),
PostTxError (..),
PostTxError (..), contestationPeriod,
)
import Hydra.Chain.Direct.State (
ChainContext,
Expand All @@ -47,7 +48,7 @@ import Hydra.Chain.Direct.State (
fanout,
getKnownUTxO,
initialize,
observeSomeTx,
observeSomeTx, contestationPeriod,
)
import qualified Hydra.Chain.Direct.State as ChainState
import Hydra.Chain.Direct.TimeHandle (TimeHandle (..))
Expand All @@ -57,10 +58,9 @@ import Hydra.Chain.Direct.Wallet (
TinyWalletLog,
)
import Hydra.ContestationPeriod (toNominalDiffTime)
import Hydra.HeadLogic (HeadStateEvent (..))
import Hydra.Ledger (ChainSlot (ChainSlot))
import Hydra.Logging (Tracer, traceWith)
import Hydra.Persistence (PersistenceIncremental, loadAll)
import Hydra.Persistence (PersistenceIncrementalView, selectAll)
import Hydra.Prelude
import Plutus.Orphans ()
import System.IO.Error (userError)
Expand Down Expand Up @@ -252,36 +252,21 @@ chainSyncHandler ::
-- | Contextual information about our chain connection.
ChainContext ->
LocalChainState m ->
PersistenceIncremental (HeadStateEvent Tx) m ->
PersistenceIncrementalView ChainStateAt m ->
-- | A chain-sync handler to use in a local-chain-sync client.
ChainSyncHandler m
chainSyncHandler tracer callback getTimeHandle ctx localChainState persistence =
chainSyncHandler tracer callback getTimeHandle ctx localChainState persistenceView =
ChainSyncHandler
{ onRollBackward
, onRollForward
}
where
LocalChainState{rollback, getLatest, pushNew} = localChainState

loadChainStateEvents = do
events <- loadAll persistence
pure $
events
& mapMaybe
( \case
HeadInitialized{newChainState} -> Just newChainState
TxCommitted{newChainState} -> Just newChainState
HeadAborted{newChainState} -> Just newChainState
HeadOpened{newChainState} -> Just newChainState
HeadClosed{newChainState} -> Just newChainState
HeadFannedOut{newChainState} -> Just newChainState
_ -> Nothing
)

onRollBackward :: ChainPoint -> m ()
onRollBackward point = do
traceWith tracer $ RolledBackward{point}
chainStateEvents <- loadChainStateEvents
chainStateEvents <- selectAll persistenceView
rolledBackChainState <- atomically $ rollback point chainStateEvents
callback Rollback{rolledBackChainState}

Expand Down Expand Up @@ -377,7 +362,7 @@ prepareTxToPost timeHandle wallet ctx cst@ChainStateAt{chainState} tx =

-- See ADR21 for context
calculateTxUpperBoundFromContestationPeriod currentTime = do
let effectiveDelay = min (toNominalDiffTime $ ChainState.contestationPeriod ctx) maxGraceTime
let effectiveDelay = min (toNominalDiffTime $ contestationPeriod ctx) maxGraceTime
let upperBoundTime = addUTCTime effectiveDelay currentTime
upperBoundSlot <- throwLeft $ slotFromUTCTime upperBoundTime
pure (upperBoundSlot, upperBoundTime)
Expand Down
10 changes: 10 additions & 0 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,16 @@ deriving instance (IsTx tx, IsChainState tx) => FromJSON (HeadStateEvent tx)
instance (IsTx tx, Arbitrary (ChainStateType tx)) => Arbitrary (HeadStateEvent tx) where
arbitrary = genericArbitrary

toChainStateEvents :: HeadStateEvent tx -> Maybe (ChainStateType tx)
toChainStateEvents = \case
HeadInitialized{newChainState} -> Just newChainState
TxCommitted{newChainState} -> Just newChainState
HeadAborted{newChainState} -> Just newChainState
HeadOpened{newChainState} -> Just newChainState
HeadClosed{newChainState} -> Just newChainState
HeadFannedOut{newChainState} -> Just newChainState
_ -> Nothing

updateHeadState ::
IsChainState tx =>
HeadState tx ->
Expand Down
18 changes: 18 additions & 0 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,21 @@ createPersistenceIncremental fp = do
Left e -> throwIO $ PersistenceException e
Right decoded -> pure decoded
}

newtype PersistenceIncrementalView a m =
PersistenceIncrementalView {
selectAll :: FromJSON a => m [a]
}

createPersistenceIncrementalView ::
(Monad m, FromJSON a) =>
PersistenceIncremental a m ->
(a -> Maybe b) ->
PersistenceIncrementalView b m
createPersistenceIncrementalView PersistenceIncremental{loadAll} f =
PersistenceIncrementalView {
selectAll = do
as <- loadAll
let bs = f <$> as
return $ catMaybes bs
}
25 changes: 13 additions & 12 deletions hydra-node/test/Hydra/Chain/Direct/HandlersSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ import Hydra.Chain.Direct.State (
unsafeObserveInit,
)
import Hydra.Chain.Direct.TimeHandle (TimeHandle (slotToUTCTime), TimeHandleParams (..), genTimeParams, mkTimeHandle)
import Hydra.HeadLogic (HeadStateEvent (..))
import Hydra.HeadLogic (HeadStateEvent (..), toChainStateEvents)
import Hydra.Ledger (
ChainSlot (..),
)
import Hydra.Options (maximumNumberOfParties)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncrementalView)
import Hydra.Snapshot (ConfirmedSnapshot (..))
import Test.Consensus.Cardano.Generators ()
import Test.Hydra.Prelude
Expand Down Expand Up @@ -141,7 +141,7 @@ spec = do
atomically $ modifyTVar chainStateEvents (event :)
, loadAll = readTVarIO chainStateEvents
}

let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
let chainSyncCallback = \_cont -> failure "Unexpected callback"
handler =
chainSyncHandler
Expand All @@ -150,7 +150,7 @@ spec = do
(pure timeHandle)
chainContext
localChainState
persistence
persistenceView
run $ do
onRollForward handler header txs
`shouldThrow` \TimeConversionException{slotNo} -> slotNo == slot
Expand Down Expand Up @@ -188,15 +188,15 @@ spec = do
atomically $ modifyTVar chainStateEvents (event :)
, loadAll = readTVarIO chainStateEvents
}

let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
let handler =
chainSyncHandler
nullTracer
callback
(pure timeHandle)
ctx
localChainState
persistence
persistenceView
run $ do
onRollForward handler header txs
latest' <- atomically $ getLatest localChainState
Expand Down Expand Up @@ -225,15 +225,15 @@ spec = do
atomically $ modifyTVar chainStateEvents (event :)
, loadAll = readTVarIO chainStateEvents
}

let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
let handler =
chainSyncHandler
nullTracer
callback
(pure timeHandle)
chainContext
localChainState
persistence
persistenceView

-- Simulate some chain following
run $ forM_ blocks $ \(TestBlock header txs) -> do
Expand Down Expand Up @@ -266,15 +266,15 @@ spec = do
atomically $ modifyTVar chainStateEvents (event :)
, loadAll = readTVarIO chainStateEvents
}

let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
let handler =
chainSyncHandler
nullTracer
(\_ -> pure ())
(pure timeHandle)
chainContext
localChainState
persistence
persistenceView

run $ forM_ blocks $ \(TestBlock header txs) -> do
onRollForward handler header txs
Expand All @@ -297,7 +297,7 @@ spec = do
(pure timeHandle)
chainContext
resumedLocalChainState
persistence
persistenceView

(rollbackPoint, blocksAfter) <- pickBlind $ genRollbackBlocks blocks
monitor $ label $ "Rollback " <> show (length blocksAfter) <> " blocks"
Expand Down Expand Up @@ -327,7 +327,8 @@ recordEventsHandler ::
recordEventsHandler ctx cs getTimeHandle persistence = do
eventsVar <- newTVarIO []
localChainState <- newLocalChainState cs
let handler = chainSyncHandler nullTracer (recordEvents eventsVar) getTimeHandle ctx localChainState persistence
let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
let handler = chainSyncHandler nullTracer (recordEvents eventsVar) getTimeHandle ctx localChainState persistenceView
pure (handler, getEvents eventsVar)
where
getEvents = readTVarIO
Expand Down
6 changes: 4 additions & 2 deletions hydra-node/test/Hydra/Model/MockChain.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import Hydra.Crypto (HydraKey)
import Hydra.HeadLogic (
Environment (Environment, otherParties, party),
Event (..),
defaultTTL,
defaultTTL, toChainStateEvents,
)
import Hydra.Logging (Tracer)
import Hydra.Model.Payment (CardanoSigningKey (..))
Expand All @@ -60,6 +60,7 @@ import Hydra.Node (
)
import Hydra.Node.EventQueue (EventQueue (..))
import Hydra.Party (Party (..), deriveParty)
import Hydra.Persistence (createPersistenceIncrementalView)

-- | Create a mocked chain which connects nodes through 'ChainSyncHandler' and
-- 'Chain' interfaces. It calls connected chain sync handlers 'onRollForward' on
Expand Down Expand Up @@ -118,14 +119,15 @@ mockChainAndNetwork tr seedKeys cp = do
getTimeHandle
seedInput
localChainState
let persistenceView = createPersistenceIncrementalView persistence toChainStateEvents
let chainHandler =
chainSyncHandler
tr
(putEvent . OnChainEvent)
getTimeHandle
ctx
localChainState
persistence
persistenceView
let node' =
node
{ hn = createMockNetwork node nodes
Expand Down

0 comments on commit a0d5f74

Please sign in to comment.