Skip to content

Commit

Permalink
Implement store
Browse files Browse the repository at this point in the history
Persisting ledger writes to the file system into the store.log file in
the application-go directory. The write values are converted from bytes
to a string when the read write sets are unwrapped in the transaction
processor.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
  • Loading branch information
twoGiants committed Dec 28, 2024
1 parent 0b9a1f0 commit b0ab773
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 15 deletions.
74 changes: 60 additions & 14 deletions off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"offChainData/parser"
"os"
"slices"
"strconv"
"strings"

"github.com/hyperledger/fabric-gateway/pkg/client"
"google.golang.org/grpc"
)

var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
var storeFile = envOrDefault("STORE_FILE", "store.log")
var simulatedFailureCount = getSimulatedFailureCount()

const startBlock = 0

var transactionCount uint = 0 // Used only to simulate failures

// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type store = func(data ledgerUpdate)

Expand All @@ -27,20 +37,59 @@ type ledgerUpdate struct {
// Description of a ledger write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
channelName string
ChannelName string `json:"channelName"`
// Namespace within the ledger.
namespace string
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
key string
Key string `json:"key"`
// Whether the key and associated value are being deleted.
isDelete bool
// If `isDelete` is false, the value written to the key; otherwise ignored.
value []byte
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
}

// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
var applyWritesToOffChainStore = func(data ledgerUpdate) {
if err := simulateFailureIfRequired(); err != nil {
fmt.Println("[expected error]: " + err.Error())
return
}

writes := []string{}
for _, write := range data.writes {
marshaled, err := json.Marshal(write)
if err != nil {
panic(err)
}

writes = append(writes, string(marshaled))
}

f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}

if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
f.Close()
panic(err)
}

if err := f.Close(); err != nil {
panic(err)
}
}

func simulateFailureIfRequired() error {
if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount {
transactionCount = 0
return errors.New("simulated write failure")
}

transactionCount += 1

return nil
}

func listen(clientConnection *grpc.ClientConn) {
Expand All @@ -62,7 +111,7 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Printf("Start event listening from block %d\n", checkpointer.BlockNumber())
fmt.Printf("Last processed transaction ID within block: %s\n", checkpointer.TransactionID())
if simulatedFailureCount > 0 {
fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount)
fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount)
}

// TODO put into infinite loop like in public docs example
Expand All @@ -71,8 +120,8 @@ func listen(clientConnection *grpc.ClientConn) {

blocks, err := network.BlockEvents(
ctx,
client.WithStartBlock(0),
client.WithCheckpoint(checkpointer),
client.WithStartBlock(startBlock), // Used only if there is no checkpoint block number
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -200,8 +249,6 @@ type transactionProcessor struct {
func (t *transactionProcessor) process() {
transactionId := t.transaction.ChannelHeader().GetTxId()

fmt.Println("Process transaction", transactionId)

writes := t.writes()
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
Expand Down Expand Up @@ -232,14 +279,13 @@ func (t *transactionProcessor) writes() []write {
namespace := readWriteSet.Namespace()

for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
aWrite := write{
writes = append(writes, write{
channelName,
namespace,
kvWrite.GetKey(),
kvWrite.GetIsDelete(),
kvWrite.GetValue(),
}
writes = append(writes, aWrite)
string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion off_chain_data/application-go/transact.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newTransactApp(smartContract *atb.AssetTransferBasic) *transactApp {

func (t *transactApp) run() {
for i := 0; i < int(t.batchSize); i++ {
go t.transact()
t.transact()
}
}

Expand Down

0 comments on commit b0ab773

Please sign in to comment.