Skip to content

Commit

Permalink
Populate epoch_stake table earlier
Browse files Browse the repository at this point in the history
Fixes #1406
  • Loading branch information
kderme committed Aug 17, 2023
1 parent bf2112b commit 90cd202
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 43 deletions.
1 change: 1 addition & 0 deletions cardano-db-sync/cardano-db-sync.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ library

Cardano.DbSync.Rollback

Cardano.DbSync.Fix.EpochStake
Cardano.DbSync.Fix.PlutusDataBytes
Cardano.DbSync.Fix.PlutusScripts
Cardano.DbSync.LocalStateQuery
Expand Down
2 changes: 2 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import Database.Persist.SqlBackend.Internal.StatementCache
import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..))
import qualified Ouroboros.Consensus.HardFork.Combinator as Consensus
import Ouroboros.Network.Block (blockHash, blockNo, getHeaderFields, headerFieldBlockNo, unBlockNo)
import Cardano.DbSync.Fix.EpochStake

insertListBlocks ::
SyncEnv ->
Expand Down Expand Up @@ -81,6 +82,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
, ". Time to restore consistency."
]
rollbackFromBlockNo syncEnv (blockNo cblk)
void $ migrateStakeDistr syncEnv (apOldLedger applyRes)
insertBlock syncEnv cblk applyRes True tookSnapshot
liftIO $ setConsistentLevel syncEnv Consistent
Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do
Expand Down
63 changes: 37 additions & 26 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic/StakeDist.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,50 @@ getSecurityParameter = maxRollbacks . configSecurityParam . pInfoConfig
getStakeSlice ::
ConsensusProtocol (BlockProtocol blk) =>
ProtocolInfo blk ->
EpochNo ->
Word64 ->
Word64 ->
ExtLedgerState CardanoBlock ->
Bool ->
StakeSliceRes
getStakeSlice pInfo epoch !sliceIndex !minSliceSize els =
getStakeSlice pInfo !epochBlockNo els isMigration =
case ledgerState els of
LedgerStateByron _ -> NoSlices
LedgerStateShelley sls -> genericStakeSlice pInfo epoch sliceIndex minSliceSize sls
LedgerStateAllegra als -> genericStakeSlice pInfo epoch sliceIndex minSliceSize als
LedgerStateMary mls -> genericStakeSlice pInfo epoch sliceIndex minSliceSize mls
LedgerStateAlonzo als -> genericStakeSlice pInfo epoch sliceIndex minSliceSize als
LedgerStateBabbage bls -> genericStakeSlice pInfo epoch sliceIndex minSliceSize bls
LedgerStateConway cls -> genericStakeSlice pInfo epoch sliceIndex minSliceSize cls
LedgerStateShelley sls -> genericStakeSlice pInfo epochBlockNo sls isMigration
LedgerStateAllegra als -> genericStakeSlice pInfo epochBlockNo als isMigration
LedgerStateMary mls -> genericStakeSlice pInfo epochBlockNo mls isMigration
LedgerStateAlonzo als -> genericStakeSlice pInfo epochBlockNo als isMigration
LedgerStateBabbage bls -> genericStakeSlice pInfo epochBlockNo bls isMigration
LedgerStateConway cls -> genericStakeSlice pInfo epochBlockNo cls isMigration

genericStakeSlice ::
forall era c blk p.
(c ~ StandardCrypto, EraCrypto era ~ c, ConsensusProtocol (BlockProtocol blk)) =>
ProtocolInfo blk ->
EpochNo ->
Word64 ->
Word64 ->
LedgerState (ShelleyBlock p era) ->
Bool ->
StakeSliceRes
genericStakeSlice pInfo epoch sliceIndex minSliceSize lstate
genericStakeSlice pInfo epochBlockNo lstate isMigration
| index > delegationsLen = NoSlices
| index == delegationsLen = Slice (emptySlice epoch) True
| index + epochSliceSize > delegationsLen = Slice (mkSlice (delegationsLen - index)) True
| otherwise = Slice (mkSlice epochSliceSize) False
| index + size > delegationsLen = Slice (mkSlice (delegationsLen - index)) True
| otherwise = Slice (mkSlice size) False
where
-- We use 'ssStakeSet' here instead of 'ssStateMark' because the stake addresses for the
-- later may not have been added to the database yet. That means that when these values
epoch :: EpochNo
epoch = 1 + Shelley.nesEL (Consensus.shelleyLedgerState lstate)

minSliceSize :: Word64
minSliceSize = 2000

-- On mainnet this is 2160
k :: Word64
k = getSecurityParameter pInfo

-- We use 'ssStakeMark' here. That means that when these values
-- are added to the database, the epoch number where they become active is the current
-- epoch plus one.

stakeSnapshot :: Ledger.SnapShot c
stakeSnapshot =
Ledger.ssStakeSet . Shelley.esSnapshots . Shelley.nesEs $
Ledger.ssStakeMark . Shelley.esSnapshots . Shelley.nesEs $
Consensus.shelleyLedgerState lstate

delegations :: VMap.KVVector VB VB (Credential 'Staking c, KeyHash 'StakePool c)
Expand All @@ -132,10 +138,6 @@ genericStakeSlice pInfo epoch sliceIndex minSliceSize lstate
epochSliceSize =
max minSliceSize defaultEpochSliceSize
where
-- On mainnet this is 2160
k :: Word64
k = getSecurityParameter pInfo

-- On mainnet this is 21600
expectedBlocks :: Word64
expectedBlocks = 10 * k
Expand All @@ -147,20 +149,29 @@ genericStakeSlice pInfo epoch sliceIndex minSliceSize lstate

-- The starting index of the data in the delegation vector.
index :: Word64
index = sliceIndex * epochSliceSize
index
| isMigration = 0
| epochBlockNo < k = delegationsLen + 1 -- so it creates the empty Slice.
| otherwise = (epochBlockNo - k) * epochSliceSize

size :: Word64
size
| isMigration , epochBlockNo + 1 < k = 0
| isMigration = (epochBlockNo + 1 - k) * epochSliceSize
| otherwise = epochSliceSize

mkSlice :: Word64 -> StakeSlice
mkSlice size =
mkSlice actualSize =
StakeSlice
{ sliceEpochNo = epoch
, sliceDistr = distribution
}
where
delegationsSliced :: VMap VB VB (Credential 'Staking c) (KeyHash 'StakePool c)
delegationsSliced = VMap $ VG.slice (fromIntegral index) (fromIntegral size) delegations
delegationsSliced = VMap $ VG.slice (fromIntegral index) (fromIntegral actualSize) delegations

distribution :: Map StakeCred (Coin, PoolKeyHash)
distribution =
VMap.toMap $
VMap.mapMaybe id $
VMap.mapWithKey (\k p -> (,p) <$> lookupStake k) delegationsSliced
VMap.mapWithKey (\a p -> (,p) <$> lookupStake a) delegationsSliced
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ insertStakeSlice _ Generic.NoSlices = pure ()
insertStakeSlice syncEnv (Generic.Slice slice finalSlice) = do
insertEpochStake (envCache syncEnv) network (Generic.sliceEpochNo slice) (Map.toList $ Generic.sliceDistr slice)
when finalSlice $ do
lift $ DB.updateSetComplete $ unEpochNo $ Generic.sliceEpochNo slice
size <- lift $ DB.queryEpochStakeCount (unEpochNo $ Generic.sliceEpochNo slice)
liftIO . logInfo tracer $ mconcat ["Inserted ", show size, " EpochStake for ", show (Generic.sliceEpochNo slice)]
liftIO . logInfo tracer $
mconcat ["Inserted ", show size, " EpochStake for ", show (Generic.sliceEpochNo slice)]
where
tracer :: Trace IO Text
tracer = getTrace syncEnv
Expand Down
69 changes: 69 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Fix/EpochStake.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}

module Cardano.DbSync.Fix.EpochStake where

import Cardano.DbSync.Api
import Cardano.DbSync.Api.Types
import Cardano.DbSync.Ledger.State
import Cardano.DbSync.Ledger.Types
import qualified Cardano.Db as DB
import qualified Data.Map.Strict as Map
import Cardano.DbSync.Era.Shelley.Generic.StakeDist hiding (getStakeSlice)
import qualified Data.Strict.Maybe as Strict
import Database.Persist.Sql (SqlBackend)
import Cardano.Prelude
import Cardano.BM.Trace (logInfo, logWarning)
import Cardano.DbSync.Era.Shelley.Insert.Epoch
import Cardano.DbSync.Error
import Control.Monad.Trans.Control

migrateStakeDistr :: (MonadIO m, MonadBaseControl IO m) => SyncEnv -> Strict.Maybe CardanoLedgerState -> ExceptT SyncNodeError (ReaderT SqlBackend m) Bool
migrateStakeDistr env mcls =
case (envLedgerEnv env, mcls) of
(HasLedger lenv, Strict.Just cls) -> do
ems <- lift DB.queryAllExtraMigrations
runWhen (DB.isStakeDistrComplete ems) $ do
liftIO $ logInfo trce "Starting Stake Distribution migration on table epoch_stake"
let stakeSlice = getStakeSlice lenv cls True
case stakeSlice of
NoSlices ->
liftIO $ logInsert 0
Slice (StakeSlice _epochNo distr) isFinal -> do
liftIO $ logInsert (Map.size distr)
insertStakeSlice env stakeSlice
(mminEpoch, mmaxEpoch) <- lift DB.queryMinMaxEpochStake
liftIO $ logMinMax mminEpoch mmaxEpoch
case (mminEpoch, mmaxEpoch) of
(Just minEpoch, Just maxEpoch) -> do
when (maxEpoch > 0) $
lift $ DB.insertEpochStakeProgress (mkProgress True <$> [minEpoch..(maxEpoch-1)])
lift $ DB.insertEpochStakeProgress [mkProgress isFinal maxEpoch]
_ -> pure ()
lift $ DB.insertExtraMigration DB.StakeDistrEnded
_ -> pure False
where
trce = getTrace env
mkProgress isCompleted e =
DB.EpochStakeProgress
{ DB.epochStakeProgressEpochNo = e
, DB.epochStakeProgressCompleted = isCompleted
}

logInsert :: Int -> IO ()
logInsert n
| n == 0 = logInfo trce "No missing epoch_stake found"
| n > 100000 = logWarning trce $ "Found " <> DB.textShow n <> " epoch_stake. This may take a while"
| otherwise = logInfo trce $ "Found " <> DB.textShow n <> " epoch_stake"

logMinMax mmin mmax =
logInfo trce $ mconcat
[ "Min epoch_stake at "
, DB.textShow mmin
, " and max at "
, DB.textShow mmax
]

runWhen :: Monad m => Bool -> m () -> m Bool
runWhen a action = do
if a then action >> pure True else pure False
33 changes: 18 additions & 15 deletions cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ module Cardano.DbSync.Ledger.State (
hashToAnnotation,
getHeaderHash,
runLedgerStateWriteThread,
getStakeSlice,
getSliceMeta,
) where

import Cardano.BM.Trace (Trace, logInfo, logWarning)
Expand Down Expand Up @@ -230,8 +232,9 @@ applyBlock env blk = do
{ apPrices = getPrices newState
, apPoolsRegistered = getRegisteredPools oldState
, apNewEpoch = maybeToStrict newEpoch
, apOldLedger = Strict.Just oldState
, apSlotDetails = details
, apStakeSlice = stakeSlice newState details
, apStakeSlice = getStakeSlice env newState False
, apEvents = ledgerEvents
, apDepositsMap = DepositsMap deposits
}
Expand Down Expand Up @@ -265,20 +268,20 @@ applyBlock env blk = do
applyToEpochBlockNo _ _ GenesisEpochBlockNo = EpochBlockNo 0
applyToEpochBlockNo _ _ EBBEpochBlockNo = EpochBlockNo 0

stakeSliceMinSize :: Word64
stakeSliceMinSize = 2000

stakeSlice :: CardanoLedgerState -> SlotDetails -> Generic.StakeSliceRes
stakeSlice cls details =
case clsEpochBlockNo cls of
EpochBlockNo n ->
Generic.getStakeSlice
(leProtocolInfo env)
(sdEpochNo details)
n
stakeSliceMinSize
(clsState cls)
_ -> Generic.NoSlices
getStakeSlice :: HasLedgerEnv -> CardanoLedgerState -> Bool -> Generic.StakeSliceRes
getStakeSlice env cls isMigration =
case clsEpochBlockNo cls of
EpochBlockNo n ->
Generic.getStakeSlice
(leProtocolInfo env)
n
(clsState cls)
isMigration
_ -> Generic.NoSlices

getSliceMeta :: Generic.StakeSliceRes -> Maybe (Bool, EpochNo)
getSliceMeta (Generic.Slice (Generic.StakeSlice epochNo _) isFinal) = Just (isFinal, epochNo)
getSliceMeta _ = Nothing

storeSnapshotAndCleanupMaybe ::
HasLedgerEnv ->
Expand Down
2 changes: 2 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ data ApplyResult = ApplyResult
{ apPrices :: !(Strict.Maybe Prices) -- prices after the block application
, apPoolsRegistered :: !(Set.Set PoolKeyHash) -- registered before the block application
, apNewEpoch :: !(Strict.Maybe Generic.NewEpoch) -- Only Just for a single block at the epoch boundary
, apOldLedger :: !(Strict.Maybe CardanoLedgerState)
, apSlotDetails :: !SlotDetails
, apStakeSlice :: !Generic.StakeSliceRes
, apEvents :: ![LedgerEvent]
Expand All @@ -136,6 +137,7 @@ defaultApplyResult slotDetails =
{ apPrices = Strict.Nothing
, apPoolsRegistered = Set.empty
, apNewEpoch = Strict.Nothing
, apOldLedger = Strict.Nothing
, apSlotDetails = slotDetails
, apStakeSlice = Generic.NoSlices
, apEvents = []
Expand Down
18 changes: 18 additions & 0 deletions cardano-db/src/Cardano/Db/Insert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ module Cardano.Db.Insert (
insertCheckPoolOfflineFetchError,
insertReservedPoolTicker,
insertDelistedPool,
insertExtraMigration,
insertEpochStakeProgress,
updateSetComplete,
replaceAdaPots,
insertUnchecked,
insertMany',
Expand Down Expand Up @@ -110,6 +113,10 @@ import Database.Persist.Types (
entityKey,
)
import Database.PostgreSQL.Simple (SqlError)
import Cardano.Db.Types
import Cardano.Db.Text
import Data.Word (Word64)
import Database.Persist (updateWhere, (=.), (==.))

-- The original naive way of inserting rows into Postgres was:
--
Expand Down Expand Up @@ -281,6 +288,17 @@ insertReservedPoolTicker ticker = do
insertDelistedPool :: (MonadBaseControl IO m, MonadIO m) => DelistedPool -> ReaderT SqlBackend m DelistedPoolId
insertDelistedPool = insertCheckUnique "DelistedPool"

insertExtraMigration :: (MonadBaseControl IO m, MonadIO m) => ExtraMigration -> ReaderT SqlBackend m ()
insertExtraMigration token = void . insert $ ExtraMigrations (textShow token) (Just $ extraDescription token)

insertEpochStakeProgress :: (MonadBaseControl IO m, MonadIO m) => [EpochStakeProgress] -> ReaderT SqlBackend m ()
insertEpochStakeProgress =
insertManyUncheckedUnique "Many EpochStakeProgress"

updateSetComplete :: MonadIO m => Word64 -> ReaderT SqlBackend m ()
updateSetComplete epoch = do
updateWhere [EpochStakeProgressEpochNo ==. epoch] [EpochStakeProgressCompleted =. True]

replaceAdaPots :: (MonadBaseControl IO m, MonadIO m) => BlockId -> AdaPots -> ReaderT SqlBackend m Bool
replaceAdaPots blockId adapots = do
mAdaPotsId <- queryAdaPotsId blockId
Expand Down
25 changes: 24 additions & 1 deletion cardano-db/src/Cardano/Db/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ module Cardano.Db.Query (
existsPoolMetadataRefId,
queryAdaPotsId,
queryBlockHeight,
queryAllExtraMigrations,
queryMinMaxEpochStake,
-- queries used in smash
queryPoolOfflineData,
queryPoolRegister,
Expand Down Expand Up @@ -126,7 +128,7 @@ import Data.ByteString.Char8 (ByteString)
import Data.Fixed (Micro)
import Data.Maybe (fromMaybe, listToMaybe, mapMaybe)
import Data.Ratio (numerator)
import Data.Text (Text)
import Data.Text (Text, unpack)
import Data.Time.Clock (UTCTime (..))
import Data.Tuple.Extra (uncurry3)
import Data.Word (Word64)
Expand Down Expand Up @@ -755,6 +757,27 @@ queryBlockHeight = do
pure (blk ^. BlockBlockNo)
pure $ unValue =<< listToMaybe res

queryAllExtraMigrations :: MonadIO m => ReaderT SqlBackend m [ExtraMigration]
queryAllExtraMigrations = do
res <- select $ do
ems <- from $ table @ExtraMigrations
pure (ems ^. ExtraMigrationsToken)
pure $ read . unpack . unValue <$> res

queryMinMaxEpochStake :: MonadIO m => ReaderT SqlBackend m (Maybe Word64, Maybe Word64)
queryMinMaxEpochStake = do
maxEpoch <- select $ do
es <- from $ table @EpochStake
orderBy [desc (es ^. EpochStakeId)]
limit 1
pure (es ^. EpochStakeEpochNo)
minEpoch <- select $ do
es <- from $ table @EpochStake
orderBy [asc (es ^. EpochStakeId)]
limit 1
pure (es ^. EpochStakeEpochNo)
pure (unValue <$> listToMaybe minEpoch, unValue <$> listToMaybe maxEpoch)

{--------------------------------------------
Queries use in SMASH
----------------------------------------------}
Expand Down
9 changes: 9 additions & 0 deletions cardano-db/src/Cardano/Db/Schema.hs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ share
epochNo Word64 sqltype=word31type
UniqueStake epochNo addrId poolId

EpochStakeProgress
epochNo Word64 sqltype=word31type
completed Bool
UniqueEpochStakeProgress epochNo

Treasury
addrId StakeAddressId noreference
certIndex Word16
Expand Down Expand Up @@ -485,6 +490,10 @@ share
costs Text sqltype=jsonb
UniqueCostModel hash

ExtraMigrations
token Text
description Text Maybe

-- -----------------------------------------------------------------------------------------------
-- Pool offline (ie not on the blockchain) data.

Expand Down
Loading

0 comments on commit 90cd202

Please sign in to comment.