Skip to content

Commit

Permalink
Draft proposal to handle chain-state evolution
Browse files Browse the repository at this point in the history
- Remove previous from ChainStateAt
- Rollback now requires to load all persisted events,
in order to know from which chain state it should continue processing
- Persisted events remain untouched
  • Loading branch information
ffakenz committed Jul 9, 2023
1 parent 9072baa commit 1fc3704
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 45 deletions.
14 changes: 11 additions & 3 deletions hydra-cluster/test/Test/DirectChainSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import CardanoClient (
waitForUTxO,
)
import CardanoNode (NodeLog, RunningNode (..), withCardanoNodeDevnet)
import Control.Concurrent.STM (newEmptyTMVarIO, takeTMVar)
import Control.Concurrent.STM (modifyTVar, newEmptyTMVarIO, newTVarIO, readTVarIO, takeTMVar)
import Control.Concurrent.STM.TMVar (putTMVar)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as B8
Expand Down Expand Up @@ -61,6 +61,7 @@ import Hydra.Cluster.Fixture (
import Hydra.Cluster.Util (chainConfigFor, keysFor)
import Hydra.ContestationPeriod (ContestationPeriod)
import Hydra.Crypto (aggregate, sign)
import Hydra.HeadLogic (HeadStateEvent)
import Hydra.Ledger (IsTx (..))
import Hydra.Ledger.Cardano (Tx, genOneUTxOFor)
import Hydra.Logging (Tracer, nullTracer, showLogsOnFailure)
Expand All @@ -69,6 +70,7 @@ import Hydra.Options (
toArgNetworkId,
)
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Snapshot (ConfirmedSnapshot (..), Snapshot (..))
import System.Process (proc, readCreateProcess)
import Test.QuickCheck (generate)
Expand Down Expand Up @@ -437,13 +439,19 @@ withDirectChainTest ::
IO a
withDirectChainTest tracer config ctx action = do
eventMVar <- newEmptyTMVarIO
events <- newTVarIO []

let callback = \event -> do
atomically $ putTMVar eventMVar event

wallet <- mkTinyWallet tracer config

withDirectChain tracer config ctx wallet initialChainState callback $ \Chain{postTx} -> do
let persistence :: PersistenceIncremental (HeadStateEvent Tx) IO =
PersistenceIncremental
{ append = \event -> do
atomically $ modifyTVar events (event :)
, loadAll = readTVarIO events
}
withDirectChain tracer config ctx wallet persistence initialChainState callback $ \Chain{postTx} -> do
action
DirectChainTest
{ postTx
Expand Down
2 changes: 1 addition & 1 deletion hydra-node/exe/hydra-node/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ main = do
nodeState <- createNodeState hs
ctx <- loadChainContext chainConfig party otherParties hydraScriptsTxId
wallet <- mkTinyWallet (contramap DirectChain tracer) chainConfig
withDirectChain (contramap DirectChain tracer) chainConfig ctx wallet (getChainState hs) (putEvent . OnChainEvent) $ \chain -> do
withDirectChain (contramap DirectChain tracer) chainConfig ctx wallet persistence (getChainState hs) (putEvent . OnChainEvent) $ \chain -> do
let RunOptions{host, port, peers, nodeId} = opts
putNetworkEvent (Authenticated msg otherParty) = putEvent $ NetworkEvent defaultTTL otherParty msg
RunOptions{apiHost, apiPort} = opts
Expand Down
8 changes: 5 additions & 3 deletions hydra-node/src/Hydra/Chain/Direct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ import Hydra.Chain.Direct.Wallet (
WalletInfoOnChain (..),
newTinyWallet,
)
import Hydra.HeadLogic (HeadStateEvent)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Options (ChainConfig (..))
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental)
import qualified Ouroboros.Consensus.HardFork.History as Consensus
import Ouroboros.Network.Magic (NetworkMagic (..))
import Ouroboros.Network.NodeToClient (
Expand All @@ -117,7 +119,6 @@ initialChainState =
ChainStateAt
{ chainState = Idle
, recordedAt = Nothing
, previous = Nothing
}

-- | Build the 'ChainContext' from a 'ChainConfig' and additional information.
Expand Down Expand Up @@ -185,10 +186,11 @@ withDirectChain ::
ChainConfig ->
ChainContext ->
TinyWallet IO ->
PersistenceIncremental (HeadStateEvent Tx) IO ->
-- | Last known chain state as loaded from persistence.
ChainStateAt ->
ChainComponent Tx IO a
withDirectChain tracer config ctx wallet chainStateAt callback action = do
withDirectChain tracer config ctx wallet persistence chainStateAt callback action = do
-- Last known point on chain as loaded from persistence.
let persistedPoint = recordedAt chainStateAt
queue <- newTQueueIO
Expand All @@ -210,7 +212,7 @@ withDirectChain tracer config ctx wallet chainStateAt callback action = do
localChainState
(submitTx queue)

let handler = chainSyncHandler tracer callback getTimeHandle ctx localChainState
let handler = chainSyncHandler tracer callback getTimeHandle ctx localChainState persistence
res <-
race
( handle onIOException $
Expand Down
62 changes: 41 additions & 21 deletions hydra-node/src/Hydra/Chain/Direct/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
-- `PostChainTx` and `OnChainTx`, and maintainance of on-chain relevant state.
module Hydra.Chain.Direct.Handlers where

import Hydra.Prelude

import qualified Cardano.Api.UTxO as UTxO
import Cardano.Slotting.Slot (SlotNo (..))
import Control.Concurrent.Class.MonadSTM (modifyTVar, newTVarIO, writeTVar)
Expand All @@ -37,7 +35,7 @@ import Hydra.Chain (
PostTxError (..),
)
import Hydra.Chain.Direct.State (
ChainContext (contestationPeriod),
ChainContext,
ChainState (Closed, Idle, Initial, Open),
ChainStateAt (..),
abort,
Expand All @@ -51,15 +49,19 @@ import Hydra.Chain.Direct.State (
initialize,
observeSomeTx,
)
import qualified Hydra.Chain.Direct.State as ChainState
import Hydra.Chain.Direct.TimeHandle (TimeHandle (..))
import Hydra.Chain.Direct.Wallet (
ErrCoverFee (..),
TinyWallet (..),
TinyWalletLog,
)
import Hydra.ContestationPeriod (toNominalDiffTime)
import Hydra.HeadLogic (HeadStateEvent (..))
import Hydra.Ledger (ChainSlot (ChainSlot))
import Hydra.Logging (Tracer, traceWith)
import Hydra.Persistence (PersistenceIncremental, loadAll)
import Hydra.Prelude
import Plutus.Orphans ()
import System.IO.Error (userError)
import Test.Cardano.Ledger.Alonzo.Serialisation.Generators ()
Expand All @@ -68,7 +70,7 @@ import Test.Cardano.Ledger.Alonzo.Serialisation.Generators ()
data LocalChainState m = LocalChainState
{ getLatest :: STM m ChainStateAt
, pushNew :: ChainStateAt -> STM m ()
, rollback :: ChainPoint -> STM m ChainStateAt
, rollback :: ChainPoint -> [ChainStateAt] -> STM m ChainStateAt
}

-- | Initialize a new local chain state with given 'ChainStateAt' (see also
Expand All @@ -88,22 +90,24 @@ newLocalChainState chainStateAt = do
where
getLatest tv = readTVar tv

pushNew tv cs =
modifyTVar tv $ \prev ->
cs{previous = Just prev}
pushNew tv cs = modifyTVar tv (const cs)

rollback tv point = do
latest <- readTVar tv
let rolledBack = go point latest
rollback tv point events = do
let maybeChainState =
events
& find
( \ChainStateAt{recordedAt} ->
case recordedAt of
Just recordPoint | recordPoint <= point -> True
_ -> False
)
-- TODO: using the same as defined in Direct module due to cyclic dependnecy.
let initialChainState = ChainStateAt{chainState = Idle, recordedAt = Nothing}
let rolledBack = fromMaybe initialChainState maybeChainState
-- REVIEW: what should we do with persisted events? prune? replace?
writeTVar tv rolledBack
pure rolledBack

go rollbackChainPoint = \case
cs@ChainStateAt{recordedAt = Just recordPoint}
| recordPoint <= rollbackChainPoint -> cs
ChainStateAt{previous = Just prev} -> go rollbackChainPoint prev
cs -> cs

-- * Posting Transactions

-- | A callback used to actually submit a transaction to the chain.
Expand Down Expand Up @@ -248,20 +252,37 @@ chainSyncHandler ::
-- | Contextual information about our chain connection.
ChainContext ->
LocalChainState m ->
PersistenceIncremental (HeadStateEvent Tx) m ->
-- | A chain-sync handler to use in a local-chain-sync client.
ChainSyncHandler m
chainSyncHandler tracer callback getTimeHandle ctx localChainState =
chainSyncHandler tracer callback getTimeHandle ctx localChainState persistence =
ChainSyncHandler
{ onRollBackward
, onRollForward
}
where
LocalChainState{rollback, getLatest, pushNew} = localChainState

loadChainStateEvents = do
events <- loadAll persistence
pure $
events
& mapMaybe
( \case
HeadInitialized{newChainState} -> Just newChainState
TxCommitted{newChainState} -> Just newChainState
HeadAborted{newChainState} -> Just newChainState
HeadOpened{newChainState} -> Just newChainState
HeadClosed{newChainState} -> Just newChainState
HeadFannedOut{newChainState} -> Just newChainState
_ -> Nothing
)

onRollBackward :: ChainPoint -> m ()
onRollBackward point = do
traceWith tracer $ RolledBackward{point}
rolledBackChainState <- atomically $ rollback point
chainStateEvents <- loadChainStateEvents
rolledBackChainState <- atomically $ rollback point chainStateEvents
callback Rollback{rolledBackChainState}

onRollForward :: BlockHeader -> [Tx] -> m ()
Expand Down Expand Up @@ -290,15 +311,14 @@ chainSyncHandler tracer callback getTimeHandle ctx localChainState =
Just event -> callback event

maybeObserveSomeTx point tx = atomically $ do
csa@ChainStateAt{chainState} <- getLatest
ChainStateAt{chainState} <- getLatest
case observeSomeTx ctx chainState tx of
Nothing -> pure Nothing
Just (observedTx, cs') -> do
let newChainState =
ChainStateAt
{ chainState = cs'
, recordedAt = Just point
, previous = Just csa
}
pushNew newChainState
pure $ Just Observation{observedTx, newChainState}
Expand Down Expand Up @@ -357,7 +377,7 @@ prepareTxToPost timeHandle wallet ctx cst@ChainStateAt{chainState} tx =

-- See ADR21 for context
calculateTxUpperBoundFromContestationPeriod currentTime = do
let effectiveDelay = min (toNominalDiffTime $ contestationPeriod ctx) maxGraceTime
let effectiveDelay = min (toNominalDiffTime $ ChainState.contestationPeriod ctx) maxGraceTime
let upperBoundTime = addUTCTime effectiveDelay currentTime
upperBoundSlot <- throwLeft $ slotFromUTCTime upperBoundTime
pure (upperBoundSlot, upperBoundTime)
Expand Down
1 change: 0 additions & 1 deletion hydra-node/src/Hydra/Chain/Direct/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ class HasKnownUTxO a where
data ChainStateAt = ChainStateAt
{ chainState :: ChainState
, recordedAt :: Maybe ChainPoint
, previous :: Maybe ChainStateAt
}
deriving (Eq, Show, Generic, ToJSON, FromJSON)

Expand Down
2 changes: 0 additions & 2 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,6 @@ onInitialChainCommitTx ::
UTxOType tx ->
Outcome tx
onInitialChainCommitTx st newChainState pt utxo =
-- HeadStateEvent: TxCommitted
NewState events
`Combined` Effects
( notifyClient
Expand Down Expand Up @@ -874,7 +873,6 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =
let snapshotSignature = sign signingKey nextSnapshot
-- Spec: T̂ ← {tx | ∀tx ∈ T̂ , Û ◦ tx ≠ ⊥} and L̂ ← Û ◦ T̂
let (seenTxs', seenUTxO') = pruneTransactions u
-- HeadStateEvent: ReqSnReceived
NewState [ReqSnReceived{nextSnapshot, seenTxs = seenTxs', seenUTxO = seenUTxO'}]
`Combined` Effects [NetworkEffect $ AckSn snapshotSignature sn]
where
Expand Down
Loading

0 comments on commit 1fc3704

Please sign in to comment.