Skip to content

Commit

Permalink
chore: audit store/streaming/file/service.go (backport #14234) (#14240)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2022
1 parent 9b2773a commit 3a51859
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 66 deletions.
4 changes: 2 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ func DefaultConfig() *Config {
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
WriteDir: "data/file_streamer",
WriteDir: "",
OutputMetadata: true,
StopNodeOnError: true,
// NOTICE: the default config don't protect the streamer data integrity
// NOTICE: The default config doesn't protect the streamer data integrity
// in face of system crash.
Fsync: false,
},
Expand Down
4 changes: 3 additions & 1 deletion server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
# output-metadata specifies if output the metadata file which includes the abci request/responses
# during processing the block.
output-metadata = "{{ .Streamers.File.OutputMetadata }}"
# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine.
stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}"
# fsync specifies if call fsync after writing the files.
fsync = "{{ .Streamers.File.Fsync }}"
Expand All @@ -265,7 +268,6 @@ fsync = "{{ .Streamers.File.Fsync }}"
# Setting max_txs to negative 1 (-1) will disable transactions from being inserted into the mempool.
# Setting max_txs to a positive number (> 0) will limit the number of transactions in the mempool, by the specified amount.
max-txs = "{{ .Mempool.MaxTxs }}"
`

var configTemplate *template.Template
Expand Down
115 changes: 73 additions & 42 deletions store/streaming/file/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (

var _ baseapp.StreamingService = &StreamingService{}

// StreamingService is a concrete implementation of StreamingService that writes state changes out to files
// StreamingService is a concrete implementation of StreamingService that writes
// state changes out to files.
type StreamingService struct {
storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore
filePrefix string // optional prefix for each of the generated files
Expand All @@ -32,103 +33,125 @@ type StreamingService struct {

currentBlockNumber int64
blockMetadata types.BlockMetadata
// if write the metadata file, otherwise only data file is outputted.

// outputMetadata, if true, writes additional metadata to file per block
outputMetadata bool
// if true, when commit failed it will panic and stop the consensus state machine to ensure the
// eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data.

// stopNodeOnErr, if true, will panic and stop the node during ABCI Commit
// to ensure eventual consistency of the output, otherwise, any errors are
// logged and ignored which could yield data loss in streamed output.
stopNodeOnErr bool
// if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash.

// fsync, if true, will execute file Sync to make sure the data is persisted
// onto disk, otherwise there is a risk of data loss during any crash.
fsync bool
}

// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) {
func NewStreamingService(
writeDir, filePrefix string,
storeKeys []types.StoreKey,
cdc codec.BinaryCodec,
logger log.Logger,
outputMetadata, stopNodeOnErr, fsync bool,
) (*StreamingService, error) {
// sort storeKeys for deterministic output
sort.SliceStable(storeKeys, func(i, j int) bool {
return storeKeys[i].Name() < storeKeys[j].Name()
})

// NOTE: We use the same listener for each store.
listeners := make([]*types.MemoryListener, len(storeKeys))
// in this case, we are using the same listener for each Store
for i, key := range storeKeys {
listeners[i] = types.NewMemoryListener(key)
}
// check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not
// we don't open a dstFile until we receive our first ABCI message

// Check that the writeDir exists and is writable so that we can catch the
// error here at initialization. If it is not we don't open a dstFile until we
// receive our first ABCI message.
if err := isDirWriteable(writeDir); err != nil {
return nil, err
}

return &StreamingService{
storeListeners: listeners,
filePrefix: filePrefix,
writeDir: writeDir,
codec: c,
codec: cdc,
logger: logger,
outputMetadata: outputMetadata,
stopNodeOnErr: stopNodeOnErr,
fsync: fsync,
}, nil
}

// Listeners satisfies the baseapp.StreamingService interface
// It returns the StreamingService's underlying WriteListeners
// Use for registering the underlying WriteListeners with the BaseApp
// Listeners satisfies the StreamingService interface. It returns the
// StreamingService's underlying WriteListeners. Use for registering the
// underlying WriteListeners with the BaseApp.
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.storeListeners))
for _, listener := range fss.storeListeners {
listeners[listener.StoreKey()] = []types.WriteListener{listener}
}

return listeners
}

// ListenBeginBlock satisfies the baseapp.ABCIListener interface
// It writes the received BeginBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) {
// ListenBeginBlock satisfies the ABCIListener interface. It sets the received
// BeginBlock request, response and the current block number. Note, these are
// not written to file until ListenCommit is executed and outputMetadata is set,
// after which it will be reset again on the next block.
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
fss.blockMetadata.RequestBeginBlock = &req
fss.blockMetadata.ResponseBeginBlock = &res
fss.currentBlockNumber = req.Header.Height
return nil
}

// ListenDeliverTx satisfies the baseapp.ABCIListener interface
// It writes the received DeliverTx request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) {
// ListenDeliverTx satisfies the ABCIListener interface. It appends the received
// DeliverTx request and response to a list of DeliverTxs objects. Note, these
// are not written to file until ListenCommit is executed and outputMetadata is
// set, after which it will be reset again on the next block.
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{
Request: &req,
Response: &res,
})

return nil
}

// ListenEndBlock satisfies the baseapp.ABCIListener interface
// It writes the received EndBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) {
// ListenEndBlock satisfies the ABCIListener interface. It sets the received
// EndBlock request, response and the current block number. Note, these are
// not written to file until ListenCommit is executed and outputMetadata is set,
// after which it will be reset again on the next block.
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
fss.blockMetadata.RequestEndBlock = &req
fss.blockMetadata.ResponseEndBlock = &res
return nil
}

// ListenEndBlock satisfies the baseapp.ABCIListener interface
// ListenCommit satisfies the ABCIListener interface. It is executed during the
// ABCI Commit request and is responsible for writing all staged data to files.
// It will only return a non-nil error when stopNodeOnErr is set.
func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error {
err := fss.doListenCommit(ctx, res)
if err != nil {
fss.logger.Error("Commit listening hook failed", "height", fss.currentBlockNumber, "err", err)
if err := fss.doListenCommit(ctx, res); err != nil {
fss.logger.Error("Listen commit failed", "height", fss.currentBlockNumber, "err", err)
if fss.stopNodeOnErr {
return err
}
}

return nil
}

func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) {
fss.blockMetadata.ResponseCommit = &res

// write to target files, the file size is written at the beginning, which can be used to detect completeness.
// Write to target files, the file size is written at the beginning, which can
// be used to detect completeness.
metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber)
dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber)

if fss.filePrefix != "" {
metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName)
dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName)
Expand All @@ -139,6 +162,7 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon
if err != nil {
return err
}

if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil {
return err
}
Expand All @@ -148,42 +172,44 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon
if err := fss.writeBlockData(&buf); err != nil {
return err
}

return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync)
}

func (fss *StreamingService) writeBlockData(writer io.Writer) error {
for _, listener := range fss.storeListeners {
cache := listener.PopStateCache()

for i := range cache {
bz, err := fss.codec.MarshalLengthPrefixed(&cache[i])
if err != nil {
return err
}
if _, err = writer.Write(bz); err != nil {

if _, err := writer.Write(bz); err != nil {
return err
}
}
}
return nil
}

// Stream satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
return nil
}

// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Close() error {
return nil
}
// Stream satisfies the StreamingService interface. It performs a no-op.
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil }

// Close satisfies the StreamingService interface. It performs a no-op.
func (fss *StreamingService) Close() error { return nil }

// isDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
// to dir. It returns nil if dir is writable. We have to do this as there is no
// platform-independent way of determining if a directory is writeable.
func isDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
if err := os.WriteFile(f, []byte(""), 0o600); err != nil {
return err
}

return os.Remove(f)
}

Expand All @@ -193,25 +219,30 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) {
if err != nil {
return sdkerrors.Wrapf(err, "open file failed: %s", path)
}

defer func() {
// avoid overriding the real error with file close error
if err1 := f.Close(); err1 != nil && err == nil {
err = sdkerrors.Wrapf(err, "close file failed: %s", path)
}
}()

_, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data))))
if err != nil {
return sdkerrors.Wrapf(err, "write length prefix failed: %s", path)
}

_, err = f.Write(data)
if err != nil {
return sdkerrors.Wrapf(err, "write block data failed: %s", path)
}

if fsync {
err = f.Sync()
if err != nil {
return sdkerrors.Wrapf(err, "fsync failed: %s", path)
}
}
return

return err
}
Loading

0 comments on commit 3a51859

Please sign in to comment.