Skip to content

Commit

Permalink
Add raw transaction transaction proccessing and inserts.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey committed Jan 16, 2025
1 parent 33b7eeb commit 24b85dc
Show file tree
Hide file tree
Showing 26 changed files with 1,398 additions and 138 deletions.
53 changes: 48 additions & 5 deletions blockchain/arbitrum_one/arbitrum_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,19 +662,20 @@ func (c *Client) DecodeProtoEntireBlockToJson(rawData *bytes.Buffer) (*seer_comm
return blocksBatchJson, nil
}

func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap map[string]map[string]*indexer.AbiEntry, threads int) ([]indexer.EventLabel, []indexer.TransactionLabel, error) {
func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap map[string]map[string]*indexer.AbiEntry, threads int) ([]indexer.EventLabel, []indexer.TransactionLabel, []indexer.RawTransaction, error) {
var protoBlocksBatch ArbitrumOneBlocksBatch

dataBytes := rawData.Bytes()

err := proto.Unmarshal(dataBytes, &protoBlocksBatch)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal data: %v", err)
return nil, nil, nil, fmt.Errorf("failed to unmarshal data: %v", err)
}

// Shared slices to collect labels
var labels []indexer.EventLabel
var txLabels []indexer.TransactionLabel
var rawTransactions []indexer.RawTransaction
var labelsMutex sync.Mutex

var decodeErr error
Expand All @@ -699,7 +700,7 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
// Local slices to collect labels for this block
var localEventLabels []indexer.EventLabel
var localTxLabels []indexer.TransactionLabel

localRawTransactions := make(map[string]indexer.RawTransaction)
for _, tx := range b.Transactions {
var decodedArgsTx map[string]interface{}

Expand Down Expand Up @@ -767,6 +768,25 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
continue
}

localRawTransactions[tx.Hash] = indexer.RawTransaction{
Hash: tx.Hash,
BlockHash: b.Hash,
BlockTimestamp: b.Timestamp,
BlockNumber: tx.BlockNumber,
FromAddress: tx.FromAddress,
ToAddress: tx.ToAddress,
Gas: tx.Gas,
GasPrice: tx.GasPrice,
Input: tx.Input,
Nonce: tx.Nonce,
MaxFeePerGas: tx.MaxFeePerGas,
MaxPriorityFeePerGas: tx.MaxPriorityFeePerGas,
TransactionIndex: tx.TransactionIndex,
TransactionType: tx.TransactionType,
Value: tx.Value,
L1BlockNumber: &b.L1BlockNumber,
}

// Convert transaction to label
transactionLabel := indexer.TransactionLabel{
Address: tx.ToAddress,
Expand Down Expand Up @@ -835,6 +855,26 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
errorChan <- fmt.Errorf("error converting decodedArgsLogs to JSON for tx %s: %v", e.TransactionHash, err)
continue
}
if _, exists := localRawTransactions[e.TransactionHash]; !exists {
localRawTransactions[e.TransactionHash] = indexer.RawTransaction{
Hash: tx.Hash,
BlockHash: b.Hash,
BlockTimestamp: b.Timestamp,
BlockNumber: tx.BlockNumber,
FromAddress: tx.FromAddress,
ToAddress: tx.ToAddress,
Gas: tx.Gas,
GasPrice: tx.GasPrice,
Input: tx.Input,
Nonce: tx.Nonce,
MaxFeePerGas: tx.MaxFeePerGas,
MaxPriorityFeePerGas: tx.MaxPriorityFeePerGas,
TransactionIndex: tx.TransactionIndex,
TransactionType: tx.TransactionType,
Value: tx.Value,
L1BlockNumber: &b.L1BlockNumber,
}
}
// Convert event to label
eventLabel := indexer.EventLabel{
Label: label,
Expand All @@ -858,6 +898,9 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
labelsMutex.Lock()
labels = append(labels, localEventLabels...)
txLabels = append(txLabels, localTxLabels...)
for _, rawTransaction := range localRawTransactions {
rawTransactions = append(rawTransactions, rawTransaction)
}
labelsMutex.Unlock()
}(b)
}
Expand All @@ -873,10 +916,10 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma

// If any errors occurred, return them
if len(errorMessages) > 0 {
return nil, nil, fmt.Errorf("errors occurred during processing:\n%s", strings.Join(errorMessages, "\n"))
return nil, nil, nil, fmt.Errorf("errors occurred during processing:\n%s", strings.Join(errorMessages, "\n"))
}

return labels, txLabels, nil
return labels, txLabels, rawTransactions, nil
}

func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCache map[uint64]uint64, abiMap map[string]map[string]*indexer.AbiEntry) ([]indexer.TransactionLabel, error) {
Expand Down
53 changes: 48 additions & 5 deletions blockchain/arbitrum_sepolia/arbitrum_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,19 +662,20 @@ func (c *Client) DecodeProtoEntireBlockToJson(rawData *bytes.Buffer) (*seer_comm
return blocksBatchJson, nil
}

func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap map[string]map[string]*indexer.AbiEntry, threads int) ([]indexer.EventLabel, []indexer.TransactionLabel, error) {
func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap map[string]map[string]*indexer.AbiEntry, threads int) ([]indexer.EventLabel, []indexer.TransactionLabel, []indexer.RawTransaction, error) {
var protoBlocksBatch ArbitrumSepoliaBlocksBatch

dataBytes := rawData.Bytes()

err := proto.Unmarshal(dataBytes, &protoBlocksBatch)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal data: %v", err)
return nil, nil, nil, fmt.Errorf("failed to unmarshal data: %v", err)
}

// Shared slices to collect labels
var labels []indexer.EventLabel
var txLabels []indexer.TransactionLabel
var rawTransactions []indexer.RawTransaction
var labelsMutex sync.Mutex

var decodeErr error
Expand All @@ -699,7 +700,7 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
// Local slices to collect labels for this block
var localEventLabels []indexer.EventLabel
var localTxLabels []indexer.TransactionLabel

localRawTransactions := make(map[string]indexer.RawTransaction)
for _, tx := range b.Transactions {
var decodedArgsTx map[string]interface{}

Expand Down Expand Up @@ -767,6 +768,25 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
continue
}

localRawTransactions[tx.Hash] = indexer.RawTransaction{
Hash: tx.Hash,
BlockHash: b.Hash,
BlockTimestamp: b.Timestamp,
BlockNumber: tx.BlockNumber,
FromAddress: tx.FromAddress,
ToAddress: tx.ToAddress,
Gas: tx.Gas,
GasPrice: tx.GasPrice,
Input: tx.Input,
Nonce: tx.Nonce,
MaxFeePerGas: tx.MaxFeePerGas,
MaxPriorityFeePerGas: tx.MaxPriorityFeePerGas,
TransactionIndex: tx.TransactionIndex,
TransactionType: tx.TransactionType,
Value: tx.Value,
L1BlockNumber: &b.L1BlockNumber,
}

// Convert transaction to label
transactionLabel := indexer.TransactionLabel{
Address: tx.ToAddress,
Expand Down Expand Up @@ -835,6 +855,26 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
errorChan <- fmt.Errorf("error converting decodedArgsLogs to JSON for tx %s: %v", e.TransactionHash, err)
continue
}
if _, exists := localRawTransactions[e.TransactionHash]; !exists {
localRawTransactions[e.TransactionHash] = indexer.RawTransaction{
Hash: tx.Hash,
BlockHash: b.Hash,
BlockTimestamp: b.Timestamp,
BlockNumber: tx.BlockNumber,
FromAddress: tx.FromAddress,
ToAddress: tx.ToAddress,
Gas: tx.Gas,
GasPrice: tx.GasPrice,
Input: tx.Input,
Nonce: tx.Nonce,
MaxFeePerGas: tx.MaxFeePerGas,
MaxPriorityFeePerGas: tx.MaxPriorityFeePerGas,
TransactionIndex: tx.TransactionIndex,
TransactionType: tx.TransactionType,
Value: tx.Value,
L1BlockNumber: &b.L1BlockNumber,
}
}
// Convert event to label
eventLabel := indexer.EventLabel{
Label: label,
Expand All @@ -858,6 +898,9 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
labelsMutex.Lock()
labels = append(labels, localEventLabels...)
txLabels = append(txLabels, localTxLabels...)
for _, rawTransaction := range localRawTransactions {
rawTransactions = append(rawTransactions, rawTransaction)
}
labelsMutex.Unlock()
}(b)
}
Expand All @@ -873,10 +916,10 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma

// If any errors occurred, return them
if len(errorMessages) > 0 {
return nil, nil, fmt.Errorf("errors occurred during processing:\n%s", strings.Join(errorMessages, "\n"))
return nil, nil, nil, fmt.Errorf("errors occurred during processing:\n%s", strings.Join(errorMessages, "\n"))
}

return labels, txLabels, nil
return labels, txLabels, rawTransactions, nil
}

func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCache map[uint64]uint64, abiMap map[string]map[string]*indexer.AbiEntry) ([]indexer.TransactionLabel, error) {
Expand Down
51 changes: 46 additions & 5 deletions blockchain/b3/b3.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,19 +652,20 @@ func (c *Client) DecodeProtoEntireBlockToJson(rawData *bytes.Buffer) (*seer_comm
return blocksBatchJson, nil
}

func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap map[string]map[string]*indexer.AbiEntry, threads int) ([]indexer.EventLabel, []indexer.TransactionLabel, error) {
func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap map[string]map[string]*indexer.AbiEntry, threads int) ([]indexer.EventLabel, []indexer.TransactionLabel, []indexer.RawTransaction, error) {
var protoBlocksBatch B3BlocksBatch

dataBytes := rawData.Bytes()

err := proto.Unmarshal(dataBytes, &protoBlocksBatch)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal data: %v", err)
return nil, nil, nil, fmt.Errorf("failed to unmarshal data: %v", err)
}

// Shared slices to collect labels
var labels []indexer.EventLabel
var txLabels []indexer.TransactionLabel
var rawTransactions []indexer.RawTransaction
var labelsMutex sync.Mutex

var decodeErr error
Expand All @@ -689,7 +690,7 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
// Local slices to collect labels for this block
var localEventLabels []indexer.EventLabel
var localTxLabels []indexer.TransactionLabel

localRawTransactions := make(map[string]indexer.RawTransaction)
for _, tx := range b.Transactions {
var decodedArgsTx map[string]interface{}

Expand Down Expand Up @@ -757,6 +758,24 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
continue
}

localRawTransactions[tx.Hash] = indexer.RawTransaction{
Hash: tx.Hash,
BlockHash: b.Hash,
BlockTimestamp: b.Timestamp,
BlockNumber: tx.BlockNumber,
FromAddress: tx.FromAddress,
ToAddress: tx.ToAddress,
Gas: tx.Gas,
GasPrice: tx.GasPrice,
Input: tx.Input,
Nonce: tx.Nonce,
MaxFeePerGas: tx.MaxFeePerGas,
MaxPriorityFeePerGas: tx.MaxPriorityFeePerGas,
TransactionIndex: tx.TransactionIndex,
TransactionType: tx.TransactionType,
Value: tx.Value,
}

// Convert transaction to label
transactionLabel := indexer.TransactionLabel{
Address: tx.ToAddress,
Expand Down Expand Up @@ -825,6 +844,25 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
errorChan <- fmt.Errorf("error converting decodedArgsLogs to JSON for tx %s: %v", e.TransactionHash, err)
continue
}
if _, exists := localRawTransactions[e.TransactionHash]; !exists {
localRawTransactions[e.TransactionHash] = indexer.RawTransaction{
Hash: tx.Hash,
BlockHash: b.Hash,
BlockTimestamp: b.Timestamp,
BlockNumber: tx.BlockNumber,
FromAddress: tx.FromAddress,
ToAddress: tx.ToAddress,
Gas: tx.Gas,
GasPrice: tx.GasPrice,
Input: tx.Input,
Nonce: tx.Nonce,
MaxFeePerGas: tx.MaxFeePerGas,
MaxPriorityFeePerGas: tx.MaxPriorityFeePerGas,
TransactionIndex: tx.TransactionIndex,
TransactionType: tx.TransactionType,
Value: tx.Value,
}
}
// Convert event to label
eventLabel := indexer.EventLabel{
Label: label,
Expand All @@ -848,6 +886,9 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma
labelsMutex.Lock()
labels = append(labels, localEventLabels...)
txLabels = append(txLabels, localTxLabels...)
for _, rawTransaction := range localRawTransactions {
rawTransactions = append(rawTransactions, rawTransaction)
}
labelsMutex.Unlock()
}(b)
}
Expand All @@ -863,10 +904,10 @@ func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, abiMap ma

// If any errors occurred, return them
if len(errorMessages) > 0 {
return nil, nil, fmt.Errorf("errors occurred during processing:\n%s", strings.Join(errorMessages, "\n"))
return nil, nil, nil, fmt.Errorf("errors occurred during processing:\n%s", strings.Join(errorMessages, "\n"))
}

return labels, txLabels, nil
return labels, txLabels, rawTransactions, nil
}

func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCache map[uint64]uint64, abiMap map[string]map[string]*indexer.AbiEntry) ([]indexer.TransactionLabel, error) {
Expand Down
Loading

0 comments on commit 24b85dc

Please sign in to comment.