From 48edc602cff221067aad53e0bb7e1f0622c2f9a2 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 17:55:52 -0600 Subject: [PATCH 1/2] Add utility to extract payloads by addresses This utility can be used to create a subset of execution state which can save time during development, testing, and support/troubleshooting. --- .../cmd/extract-payloads-by-address/cmd.go | 262 ++++++++++++++++++ .../extract_payloads_test.go | 241 ++++++++++++++++ 2 files changed, 503 insertions(+) create mode 100644 cmd/util/cmd/extract-payloads-by-address/cmd.go create mode 100644 cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go diff --git a/cmd/util/cmd/extract-payloads-by-address/cmd.go b/cmd/util/cmd/extract-payloads-by-address/cmd.go new file mode 100644 index 00000000000..acf54c07b49 --- /dev/null +++ b/cmd/util/cmd/extract-payloads-by-address/cmd.go @@ -0,0 +1,262 @@ +package extractpayloads + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "os" + "strings" + + "github.com/fxamacker/cbor/v2" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/ledger" +) + +const ( + defaultBufioWriteSize = 1024 * 32 + defaultBufioReadSize = 1024 * 32 + + payloadEncodingVersion = 1 +) + +var ( + flagInputPayloadFileName string + flagOutputPayloadFileName string + flagAddresses string +) + +var Cmd = &cobra.Command{ + Use: "extract-payload-by-address", + Short: "Read payload file and generate payload file containing payloads with specified addresses", + Run: run, +} + +func init() { + Cmd.Flags().StringVar( + &flagInputPayloadFileName, + "input-filename", + "", + "Input payload file name") + _ = Cmd.MarkFlagRequired("input-filename") + + Cmd.Flags().StringVar( + &flagOutputPayloadFileName, + "output-filename", + "", + "Output payload file name") + _ = Cmd.MarkFlagRequired("output-filename") + + Cmd.Flags().StringVar( + &flagAddresses, + "addresses", + "", + "extract payloads of addresses (comma separated hex-encoded addresses) to file specified by output-payload-filename", + ) + _ = Cmd.MarkFlagRequired("addresses") +} + +func run(*cobra.Command, []string) { + + if _, err := os.Stat(flagInputPayloadFileName); os.IsNotExist(err) { + log.Fatal().Msgf("Input file %s doesn't exist", flagInputPayloadFileName) + } + + if _, err := os.Stat(flagOutputPayloadFileName); os.IsExist(err) { + log.Fatal().Msgf("Output file %s exists", flagOutputPayloadFileName) + } + + addresses, err := parseAddresses(strings.Split(flagAddresses, ",")) + if err != nil { + log.Fatal().Err(err) + } + + log.Info().Msgf( + "extracting payloads with address %v from %s to %s", + addresses, + flagInputPayloadFileName, + flagOutputPayloadFileName, + ) + + numOfPayloadWritten, err := extractPayloads(log.Logger, flagInputPayloadFileName, flagOutputPayloadFileName, addresses) + if err != nil { + log.Fatal().Err(err) + } + + err = overwritePayloadCountInFile(flagOutputPayloadFileName, numOfPayloadWritten) + if err != nil { + log.Fatal().Err(err) + } +} + +func overwritePayloadCountInFile(output string, numOfPayloadWritten int) error { + in, err := os.OpenFile(output, os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("failed to open %s to write payload count: %w", output, err) + } + defer in.Close() + + var data [9]byte + data[0] = 0x1b + binary.BigEndian.PutUint64(data[1:], uint64(numOfPayloadWritten)) + + n, err := in.WriteAt(data[:], 0) + if err != nil { + return fmt.Errorf("failed to overwrite number of payloads in %s: %w", output, err) + } + if n != len(data) { + return fmt.Errorf("failed to overwrite number of payloads in %s: wrote %d bytes, expect %d bytes", output, n, len(data)) + } + + return nil +} + +func extractPayloads(log zerolog.Logger, input, output string, addresses []common.Address) (int, error) { + in, err := os.Open(input) + if err != nil { + return 0, fmt.Errorf("failed to open %s: %w", input, err) + } + defer in.Close() + + reader := bufio.NewReaderSize(in, defaultBufioReadSize) + if err != nil { + return 0, fmt.Errorf("failed to create bufio reader for %s: %w", input, err) + } + + out, err := os.Create(output) + if err != nil { + return 0, fmt.Errorf("failed to open %s: %w", output, err) + } + defer out.Close() + + writer := bufio.NewWriterSize(out, defaultBufioWriteSize) + if err != nil { + return 0, fmt.Errorf("failed to create bufio writer for %s: %w", output, err) + } + defer writer.Flush() + + // Preserve 9-bytes header for number of payloads. + var head [9]byte + _, err = writer.Write(head[:]) + if err != nil { + return 0, fmt.Errorf("failed to write header for %s: %w", output, err) + } + + // Need to flush buffer before encoding payloads. + writer.Flush() + + enc := cbor.NewEncoder(writer) + + const logIntervalForPayloads = 1_000_000 + count := 0 + err = readPayloadFile(log, reader, func(rawPayload []byte) error { + + payload, err := ledger.DecodePayloadWithoutPrefix(rawPayload, false, payloadEncodingVersion) + if err != nil { + return fmt.Errorf("failed to decode payload 0x%x: %w", rawPayload, err) + } + + k, err := payload.Key() + if err != nil { + return err + } + + owner := k.KeyParts[0].Value + + include := false + for _, address := range addresses { + if bytes.Equal(owner, address[:]) { + include = true + break + } + } + + if include { + err = enc.Encode(rawPayload) + if err != nil { + return fmt.Errorf("failed to encode payload: %w", err) + } + + count++ + if count%logIntervalForPayloads == 0 { + log.Info().Msgf("wrote %d payloads", count) + } + } + + return nil + }) + if err != nil { + return 0, err + } + + log.Info().Msgf("wrote %d payloads", count) + return count, nil +} + +func parseAddresses(hexAddresses []string) ([]common.Address, error) { + if len(hexAddresses) == 0 { + return nil, fmt.Errorf("at least one address must be provided") + } + + addresses := make([]common.Address, len(hexAddresses)) + for i, hexAddr := range hexAddresses { + b, err := hex.DecodeString(strings.TrimSpace(hexAddr)) + if err != nil { + return nil, fmt.Errorf("address is not hex encoded %s: %w", strings.TrimSpace(hexAddr), err) + } + + addr, err := common.BytesToAddress(b) + if err != nil { + return nil, fmt.Errorf("cannot decode address %x", b) + } + + addresses[i] = addr + } + + return addresses, nil +} + +func readPayloadFile(log zerolog.Logger, r io.Reader, processPayload func([]byte) error) error { + dec := cbor.NewDecoder(r) + + var payloadCount int + err := dec.Decode(&payloadCount) + if err != nil { + return err + } + + log.Info().Msgf("Processing input file with %d payloads", payloadCount) + + const logIntervalForPayloads = 1_000_000 + count := 0 + for { + var rawPayload []byte + err = dec.Decode(&rawPayload) + if err == io.EOF { + break + } + if err != nil { + return err + } + + err = processPayload(rawPayload) + if err != nil { + return err + } + + count++ + if count%logIntervalForPayloads == 0 { + log.Info().Msgf("processed %d payloads", count) + } + } + + log.Info().Msgf("processed %d payloads", count) + return nil +} diff --git a/cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go b/cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go new file mode 100644 index 00000000000..443fed54518 --- /dev/null +++ b/cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go @@ -0,0 +1,241 @@ +package extractpayloads + +import ( + "bytes" + "crypto/rand" + "encoding/hex" + "path/filepath" + "strings" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/utils/unittest" +) + +type keyPair struct { + key ledger.Key + value ledger.Value +} + +func TestExtractPayloads(t *testing.T) { + + t.Run("some payloads", func(t *testing.T) { + + unittest.RunWithTempDir(t, func(datadir string) { + + inputFile := filepath.Join(datadir, "input.payload") + outputFile := filepath.Join(datadir, "output.payload") + + size := 10 + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := util.CreatePayloadFile(zerolog.Nop(), inputFile, payloads, nil) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + const selectedAddressCount = 10 + selectedAddresses := make(map[string]struct{}) + selectedKeysValues := make(map[string]keyPair) + for k, kv := range keysValues { + owner := kv.key.KeyParts[0].Value + if len(owner) != common.AddressLength { + continue + } + + address, err := common.BytesToAddress(owner) + require.NoError(t, err) + + if len(selectedAddresses) < selectedAddressCount { + selectedAddresses[address.Hex()] = struct{}{} + } + + if _, exist := selectedAddresses[address.Hex()]; exist { + selectedKeysValues[k] = kv + } + } + + addresses := make([]string, 0, len(selectedAddresses)) + for address := range selectedAddresses { + addresses = append(addresses, address) + } + + // Export selected payloads + Cmd.SetArgs([]string{ + "--input-filename", inputFile, + "--output-filename", outputFile, + "--addresses", strings.Join(addresses, ","), + }) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputFile) + require.NoError(t, err) + require.Equal(t, len(selectedKeysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := selectedKeysValues[k.String()] + require.True(t, exist) + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) + + t.Run("no payloads", func(t *testing.T) { + + emptyAddress := common.Address{} + + unittest.RunWithTempDir(t, func(datadir string) { + + inputFile := filepath.Join(datadir, "input.payload") + outputFile := filepath.Join(datadir, "output.payload") + + size := 10 + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + if bytes.Equal(key.KeyParts[0].Value, emptyAddress[:]) { + continue + } + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := util.CreatePayloadFile(zerolog.Nop(), inputFile, payloads, nil) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + // Export selected payloads + Cmd.SetArgs([]string{ + "--input-filename", inputFile, + "--output-filename", outputFile, + "--addresses", hex.EncodeToString(emptyAddress[:]), + }) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputFile) + require.NoError(t, err) + require.Equal(t, 0, len(payloadsFromFile)) + }) + }) +} + +func getSampleKeyValues(i int) ([]ledger.Key, []ledger.Value) { + switch i { + case 0: + return []ledger.Key{getKey("", "uuid"), getKey("", "account_address_state")}, + []ledger.Value{[]byte{'1'}, []byte{'A'}} + case 1: + return []ledger.Key{getKey("ADDRESS", "public_key_count"), + getKey("ADDRESS", "public_key_0"), + getKey("ADDRESS", "exists"), + getKey("ADDRESS", "storage_used")}, + []ledger.Value{[]byte{1}, []byte("PUBLICKEYXYZ"), []byte{1}, []byte{100}} + case 2: + // TODO change the contract_names to CBOR encoding + return []ledger.Key{getKey("ADDRESS", "contract_names"), getKey("ADDRESS", "code.mycontract")}, + []ledger.Value{[]byte("mycontract"), []byte("CONTRACT Content")} + default: + keys := make([]ledger.Key, 0) + values := make([]ledger.Value, 0) + for j := 0; j < 10; j++ { + // address := make([]byte, 32) + address := make([]byte, 8) + _, err := rand.Read(address) + if err != nil { + panic(err) + } + keys = append(keys, getKey(string(address), "test")) + values = append(values, getRandomCadenceValue()) + } + return keys, values + } +} + +func getKey(owner, key string) ledger.Key { + return ledger.Key{KeyParts: []ledger.KeyPart{ + {Type: uint16(0), Value: []byte(owner)}, + {Type: uint16(2), Value: []byte(key)}, + }, + } +} + +func getRandomCadenceValue() ledger.Value { + + randomPart := make([]byte, 10) + _, err := rand.Read(randomPart) + if err != nil { + panic(err) + } + valueBytes := []byte{ + // magic prefix + 0x0, 0xca, 0xde, 0x0, 0x4, + // tag + 0xd8, 132, + // array, 5 items follow + 0x85, + + // tag + 0xd8, 193, + // UTF-8 string, length 4 + 0x64, + // t, e, s, t + 0x74, 0x65, 0x73, 0x74, + + // nil + 0xf6, + + // positive integer 1 + 0x1, + + // array, 0 items follow + 0x80, + + // UTF-8 string, length 10 + 0x6a, + 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, + } + + valueBytes = append(valueBytes, randomPart...) + return ledger.Value(valueBytes) +} From 75e10997e1b8b74d4b6a648412bbc59c6d8a1479 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 18:28:43 -0600 Subject: [PATCH 2/2] Refactor to use named magic number for CBOR data --- cmd/util/cmd/extract-payloads-by-address/cmd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/util/cmd/extract-payloads-by-address/cmd.go b/cmd/util/cmd/extract-payloads-by-address/cmd.go index acf54c07b49..3e384bf5d05 100644 --- a/cmd/util/cmd/extract-payloads-by-address/cmd.go +++ b/cmd/util/cmd/extract-payloads-by-address/cmd.go @@ -103,8 +103,10 @@ func overwritePayloadCountInFile(output string, numOfPayloadWritten int) error { } defer in.Close() + const cbor8BytesPositiveIntegerIndicator = 0x1b + var data [9]byte - data[0] = 0x1b + data[0] = cbor8BytesPositiveIntegerIndicator binary.BigEndian.PutUint64(data[1:], uint64(numOfPayloadWritten)) n, err := in.WriteAt(data[:], 0)