diff --git a/cmd/harvester/arbitrum_harvester/main.go b/cmd/harvester/arbitrum_harvester/main.go deleted file mode 100644 index dc369d57..00000000 --- a/cmd/harvester/arbitrum_harvester/main.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/jackc/pgx/v4/pgxpool" - "go.uber.org/zap" - - "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" -) - -func main() { - fmt.Println("Harvester Debugger") - - logger, _ := zap.NewProduction() - cfg, err := config.GetConfig() - if err != nil { - logger.Fatal("failed to create db pool") - } - pgConfig, err := cfg.Postgres.GenConfig(logger) - if err != nil { - logger.Fatal("failed to create db config") - } - pool, err := pgxpool.ConnectConfig(context.Background(), pgConfig) - if err != nil { - logger.Fatal("failed to create db pool") - } - defer pool.Close() - - // ** Harvester - var harv harvester.IHarvester - harv = harvester.NewHarvester(pool) - - // ** Ethereum Adapter - adapter := arbitrum_nova_adapter.NewArbitrumNovaAdapter(cfg, logger.Sugar()) - adapter.Run() - _, _, _ = adapter.GetInfo() - if err := harv.RegisterAdapter(adapter); err != nil { - log.Fatal(err) - } - - // ** Harvester Clients - testHandler1 := testHandler1 - ptrTestHandler1 := &testHandler1 - harv.Subscribe(harvester.ArbitrumNova, harvester.NewBlock, ptrTestHandler1) - - type pair struct { - Wallet string - Contract string - } - - pairs := []pair{ - //{ - // Wallet: "0x31854122F629B1B1E3b2aA85336F7b68f83924fA", - // Contract: "0x556353dab72b2F3223de2B2ac69700B3F280d357", - //}, - { - Wallet: "0xe2148ee53c0755215df69b2616e552154edc584f", - Contract: "0x7F85fB7f42A0c0D40431cc0f7DFDf88be6495e67", - }, - } - - for _, pair := range pairs { - err = harv.SubscribeForWalletAndContract(harvester.ArbitrumNova, pair.Wallet, pair.Contract, ptrTestHandler1) - if err != nil { - panic(err) - } - } - - time.Sleep(time.Second * 300) - harv.Unsubscribe(harvester.ArbitrumNova, harvester.NewBlock, ptrTestHandler1) - - time.Sleep(time.Second * 500) -} - -func testHandler1(p any) { - fmt.Printf("testHandler1: %+v \n", p) -} - -func testHandler2(p any) { - fmt.Printf("testHandler2: %+v \n", p) -} diff --git a/cmd/harvester/arbitrum_harvester_start/main.go b/cmd/harvester/arbitrum_harvester_start/main.go deleted file mode 100644 index 5a74c1ce..00000000 --- a/cmd/harvester/arbitrum_harvester_start/main.go +++ /dev/null @@ -1,52 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/jackc/pgx/v4/pgxpool" - "go.uber.org/zap" - - "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" -) - -func main() { - fmt.Println("Harvester Debugger") - - logger, _ := zap.NewProduction() - cfg, err := config.GetConfig() - if err != nil { - logger.Fatal(fmt.Sprintf("failed to get config: %s", err)) - } - pgConfig, err := cfg.Postgres.GenConfig(logger) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to create db config: %s", err)) - } - pool, err := pgxpool.ConnectConfig(context.Background(), pgConfig) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to create db pool: %s", err)) - } - defer pool.Close() - - harvester.Initialise(context.Background(), logger.Sugar(), cfg, pool) - - // ** Ethereum Adapter - adapter := arbitrum_nova_adapter.NewArbitrumNovaAdapter(cfg, logger.Sugar()) - adapter.Run() - _, _, _ = adapter.GetInfo() - if err := harvester.GetInstance().RegisterAdapter(adapter); err != nil { - log.Fatal(err) - } - - err = harvester.SubscribeAllWallets(context.Background(), harvester.GetInstance(), cfg, pool) - if err != nil { - log.Fatal(err) - } - - time.Sleep(time.Second * 3) - fmt.Println(err) -} diff --git a/cmd/harvester/arbitrum_table/main.go b/cmd/harvester/arbitrum_table/main.go deleted file mode 100644 index 25ed3f7b..00000000 --- a/cmd/harvester/arbitrum_table/main.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/jackc/pgx/v4/pgxpool" - "go.uber.org/zap" - - "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" -) - -func main() { - fmt.Println("Arbitrum Table Debugger") - - cfg, err := config.GetConfig() - if err != nil { - log.Fatal("failed to get config") - } - logger, _ := zap.NewProduction() - pgConfig, err := cfg.Postgres.GenConfig(logger) - if err != nil { - log.Fatalf("failed to create db config: %s", err) - } - pool, err := pgxpool.ConnectConfig(context.Background(), pgConfig) - if err != nil { - log.Fatal("failed to create db pool") - } - defer pool.Close() - - a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(cfg, logger.Sugar()) - a.Run() - - t := harvester.NewTable(pool, a, listener) - t.Run() - - //token := "0x85F17Cf997934a597031b2E18a9aB6ebD4B9f6a4" - //wallet := "0x0c3A3040075dd985F141800a1392a0Db81A09cAd" - //t.AddWalletContract(wallet, token) - - wallet2 := "0x2813fd17ea95b2655a7228383c5236e31090419e" - token2 := "0xdefa4e8a7bcba345f687a2f1456f5edd9ce97202" - //wallet2Receiver := "0x3f363b4e038a6e43ce8321c50f3efbf460196d4b" - //amount := "33190774000000000000000" - t.AddWalletContract(wallet2, token2) - t.AddWalletContract("0x3f363b4e038a6e43ce8321c50f3efbf460196d4b", token2) - - time.Sleep(time.Second * 30) - t.Display() - -} - -func listener(bcName string, events []*harvester.UpdateEvent, stakes []*harvester.StakeEvent) { - fmt.Printf("Table Listener: \n") - //for k, v := range events { - // fmt.Printf("%+v %+v %+v %+v \n", k, v.Wallet, v.Contract, v.Amount.String()) - //} - for k, v := range stakes { - fmt.Printf("%+v %+v %+v %+v \n", k, v.Wallet, v.OdysseyID, v.Amount.String()) - } -} diff --git a/cmd/harvester/harvester/main.go b/cmd/harvester/harvester/main.go deleted file mode 100644 index ab0cdeb2..00000000 --- a/cmd/harvester/harvester/main.go +++ /dev/null @@ -1,97 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/jackc/pgx/v4/pgxpool" - "go.uber.org/zap" - - "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" -) - -func main() { - fmt.Println("Harvester Debugger") - - cfg, err := config.GetConfig() - if err != nil { - log.Fatal("failed to get config") - } - logger, _ := zap.NewProduction() - pgConfig, err := cfg.Postgres.GenConfig(logger) - if err != nil { - log.Fatal("failed to get db config") - } - pool, err := pgxpool.ConnectConfig(context.Background(), pgConfig) - if err != nil { - log.Fatal("failed to create db pool") - } - defer pool.Close() - - // ** Harvester - var harv harvester.IHarvester - harv = harvester.NewHarvester(pool) - - // ** Adapter - adapter := arbitrum_nova_adapter.NewArbitrumNovaAdapter(cfg, logger.Sugar()) - adapter.Run() - _, _, _ = adapter.GetInfo() - if err := harv.RegisterAdapter(adapter); err != nil { - log.Fatal(err) - } - - // ** Harvester Clients - testHandler1 := testHandler1 - ptrTestHandler1 := &testHandler1 - harv.Subscribe(harvester.ArbitrumNova, harvester.NewBlock, ptrTestHandler1) - - testHandler2 := testHandler2 - ptrTestHandler2 := &testHandler2 - - //wallet1 := "0x9592b70a5a6c8ece2ef55547c3f07f1862372fd1" - //contract1 := "0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae" - //contract2 := "0xde0b295669a9fd93d5f28d9ec85e40f4cb697ccc" - // - //wallet2 := "0x31854122F629B1B1E3b2aA85336F7b68f83924fA" - //contract3 := "0x556353dab72b2F3223de2B2ac69700B3F280d357" - - type pair struct { - Wallet string - Contract string - } - - pairs := []pair{ - //{ - // Wallet: "0x31854122F629B1B1E3b2aA85336F7b68f83924fA", - // Contract: "0x556353dab72b2F3223de2B2ac69700B3F280d357", - //}, - { - Wallet: "0x15c7152B3b02324d17e753E4cfF65C0f1759812B", - Contract: "0x556353dab72b2F3223de2B2ac69700B3F280d357", - }, - } - - for _, pair := range pairs { - err = harv.SubscribeForWalletAndContract(harvester.ArbitrumNova, pair.Wallet, pair.Contract, ptrTestHandler2) - if err != nil { - panic(err) - } - } - - time.Sleep(time.Second * 30) - harv.Unsubscribe(harvester.ArbitrumNova, harvester.NewBlock, ptrTestHandler2) - - time.Sleep(time.Second * 500) -} - -func testHandler1(p any) { - fmt.Printf("testHandler1: %+v \n", p) -} - -func testHandler2(p any) { - fmt.Printf("testHandler2: %+v \n", p) -} diff --git a/cmd/harvester/table/main.go b/cmd/harvester/table/main.go deleted file mode 100644 index 7a9c1ecd..00000000 --- a/cmd/harvester/table/main.go +++ /dev/null @@ -1,59 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/jackc/pgx/v4/pgxpool" - "go.uber.org/zap" - - "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/ethereum_adapter" -) - -func main() { - fmt.Println("Harvester Debugger") - - cfg, err := config.GetConfig() - if err != nil { - log.Fatal("failed to get config") - } - logger, _ := zap.NewProduction() - pgConfig, err := cfg.Postgres.GenConfig(logger) - pool, err := pgxpool.ConnectConfig(context.Background(), pgConfig) - if err != nil { - log.Fatal("failed to create db pool") - } - defer pool.Close() - - a := ethereum_adapter.NewEthereumAdapter() - a.Run() - - t := harvester.NewTable(pool, a, listener) - t.Run() - - //token := "0x85F17Cf997934a597031b2E18a9aB6ebD4B9f6a4" - //wallet := "0x0c3A3040075dd985F141800a1392a0Db81A09cAd" - //t.AddWalletContract(wallet, token) - - wallet2 := "0x2813fd17ea95b2655a7228383c5236e31090419e" - token2 := "0xdefa4e8a7bcba345f687a2f1456f5edd9ce97202" - //wallet2Receiver := "0x3f363b4e038a6e43ce8321c50f3efbf460196d4b" - //amount := "33190774000000000000000" - t.AddWalletContract(wallet2, token2) - t.AddWalletContract("0x3f363b4e038a6e43ce8321c50f3efbf460196d4b", token2) - - time.Sleep(time.Second * 30) - t.Display() - -} - -func listener(bcName string, events []*harvester.UpdateEvent, stakeEvents []*harvester.StakeEvent) { - fmt.Printf("Table Listener: \n") - for k, v := range events { - fmt.Printf("%+v %+v %+v %+v \n", k, v.Wallet, v.Contract, v.Amount.String()) - } -} diff --git a/config/Arbitrum.go b/config/Arbitrum.go index d51edb3b..3b4edbf5 100644 --- a/config/Arbitrum.go +++ b/config/Arbitrum.go @@ -6,6 +6,7 @@ type Arbitrum struct { DADTokenAddress string `yaml:"arbitrum_dad_token_address" envconfig:"ARBITRUM_DAD_TOKEN_ADDRESS"` StakeAddress string `yaml:"arbitrum_stake_token_address" envconfig:"ARBITRUM_STAKE_ADDRESS"` NFTAddress string `yaml:"arbitrum_nft_address" envconfig:"ARBITRUM_NFT_ADDRESS"` + NodeAddress string `yaml:"arbitrum_node_address" envconfig:"ARBITRUM_NODE_ADDRESS"` MintNFTAmount string `json:"arbitrum_mint_nft_amount" envconfig:"ARBITRUM_MINT_NFT_AMOUNT"` MintNFTDeposit string `json:"arbitrum_mint_nft_deposit_address" envconfig:"ARBITRUM_MINT_NFT_DEPOSIT_ADDRESS"` FaucetAddress string `yaml:"arbitrum_faucet_address" envconfig:"ARBITRUM_FAUCET_ADDRESS"` @@ -20,6 +21,7 @@ func (a *Arbitrum) Init() { a.DADTokenAddress = "0xfCa1B6bD67AeF9a9E7047bf7D3949f40E8dde18d" a.StakeAddress = "0x18f3FEE919DBc22b5a68401298B01dcd46ab8665" a.NFTAddress = "0xbc48cb82903f537614E0309CaF6fe8cEeBa3d174" + a.NodeAddress = "0x19DcA2dd179A260e9Fe88ea6c821d237EA10Bfd8" a.FaucetAddress = "0x9E760F1CddA0694B6156076C60657118CF874289" a.MintNFTAmount = "4.20" a.MintNFTDeposit = "0x683642c22feDE752415D4793832Ab75EFdF6223c" diff --git a/harvester/arbitrum_nova_adapter/abi/nft.json b/contracter/arbitrum_nova_adapter/abi/nft.json similarity index 100% rename from harvester/arbitrum_nova_adapter/abi/nft.json rename to contracter/arbitrum_nova_adapter/abi/nft.json diff --git a/harvester/arbitrum_nova_adapter/abi/staking.json b/contracter/arbitrum_nova_adapter/abi/staking.json similarity index 100% rename from harvester/arbitrum_nova_adapter/abi/staking.json rename to contracter/arbitrum_nova_adapter/abi/staking.json diff --git a/harvester/arbitrum_nova_adapter/abi/token.json b/contracter/arbitrum_nova_adapter/abi/token.json similarity index 100% rename from harvester/arbitrum_nova_adapter/abi/token.json rename to contracter/arbitrum_nova_adapter/abi/token.json diff --git a/contracter/arbitrum_nova_adapter/arbitrum_nova_adapter.go b/contracter/arbitrum_nova_adapter/arbitrum_nova_adapter.go new file mode 100644 index 00000000..6e240093 --- /dev/null +++ b/contracter/arbitrum_nova_adapter/arbitrum_nova_adapter.go @@ -0,0 +1,458 @@ +package arbitrum_nova_adapter + +import ( + "context" + "fmt" + "log" + "math" + "math/big" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rpc" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/momentum-xyz/ubercontroller/config" + "github.com/momentum-xyz/ubercontroller/contracter" + "github.com/momentum-xyz/ubercontroller/utils/umid" +) + +type ArbitrumNovaAdapter struct { + logger *zap.SugaredLogger + listener contracter.AdapterListener + umid umid.UMID + wsURL string + httpURL string + name string + //client *ethclient.Client + rpcClient *rpc.Client + lastBlock uint64 + contracts *Contracts +} + +func NewArbitrumNovaAdapter(cfg *config.Config, logger *zap.SugaredLogger) *ArbitrumNovaAdapter { + return &ArbitrumNovaAdapter{ + logger: logger, + umid: umid.MustParse("ccccaaaa-1111-2222-3333-222222222222"), + wsURL: cfg.Arbitrum.WSURL, + httpURL: cfg.Arbitrum.RPCURL, + name: "arbitrum_nova", + contracts: NewContracts(&cfg.Arbitrum), + } +} + +func (a *ArbitrumNovaAdapter) GetLastBlockNumber() (uint64, error) { + var resp string + if err := a.rpcClient.Call(&resp, "eth_blockNumber"); err != nil { + return 0, errors.WithMessage(err, "failed to make RPC call to arbitrum:") + } + + return hex2int(resp), nil +} + +func (a *ArbitrumNovaAdapter) Run() { + var err error + a.rpcClient, err = rpc.DialHTTP(a.httpURL) + if err != nil { + log.Fatal(err) + } + + a.logger.Debugf("Connected to Arbitrum Block Chain: %s", a.httpURL) + /////// + + ticker := time.NewTicker(1000 * time.Millisecond) + done := make(chan bool) + go func() { + for { + select { + case <-done: + return + case t := <-ticker.C: + _ = t + //fmt.Println("Tick at", t) + n, err := a.GetLastBlockNumber() + if err != nil { + a.logger.Error(err) + } + if a.lastBlock < n { + a.lastBlock = n + if a.listener != nil { + a.listener(n, nil, nil) + } + } + } + } + }() +} + +func (a *ArbitrumNovaAdapter) RegisterNewBlockListener(f contracter.AdapterListener) { + a.listener = f +} + +func (a *ArbitrumNovaAdapter) GetBalance(wallet string, contract string, blockNumber uint64) (*big.Int, error) { + type request struct { + To string `json:"to"` + Data string `json:"data"` + } + + // "0x70a08231" - crypto.Keccak256Hash([]byte("balanceOf(address)")).String()[0:10] + data := "0x70a08231" + fmt.Sprintf("%064s", wallet[2:]) // %064s means that the string is padded with 0 to 64 bytes + req := request{contract, data} + + var resp string + n := hexutil.EncodeUint64(blockNumber) + if err := a.rpcClient.Call(&resp, "eth_call", req, n); err != nil { + return nil, errors.WithMessage(err, "failed to make RPC call to arbitrum:") + } + + // remove leading zero of resp + t := strings.TrimLeft(resp[2:], "0") + if t == "" { + t = "0" + } + s := "0x" + t + balance, err := hexutil.DecodeBig(s) + if err != nil { + log.Fatal(err) + } + + return balance, nil +} + +func (a *ArbitrumNovaAdapter) GetTransactionMessage(tx *types.Transaction) *core.Message { + msg, err := core.TransactionToMessage(tx, types.LatestSignerForChainID(tx.ChainId()), nil) + if err != nil { + log.Fatal(err) + } + return msg +} + +// refer https://github.com/ethereum/web3.py/blob/master/web3/contract.py#L435 +func (a *ArbitrumNovaAdapter) DecodeTransactionInputData(contractABI *abi.ABI, data []byte) (string, map[string]any, error) { + // The first 4 bytes of the txn represent the ID of the method in the ABI + //fmt.Println(len(data)) + methodSigData := data[:4] + method, err := contractABI.MethodById(methodSigData) + if err != nil { + err = errors.WithMessage(err, "failed to get ABI contract method by id") + return "", nil, err + } + + // parse the inputs to this method + inputsSigData := data[4:] + inputsMap := make(map[string]interface{}) + if err := method.Inputs.UnpackIntoMap(inputsMap, inputsSigData); err != nil { + err = errors.WithMessage(err, "failed to unpack ABI contract method into map") + return "", nil, err + } + //fmt.Printf("Method Name: %s\n", method.Name) + //fmt.Printf("Method inputs: %v\n", MapToJson(inputsMap)) + + return method.Name, inputsMap, nil +} + +func (a *ArbitrumNovaAdapter) GetLogsRecursively(fromBlock, toBlock int64, contracts []common.Address, level int) ([]any, error) { + + a.logger.Debugln("GET: ", strconv.Itoa(level), fromBlock, toBlock) + if level > 3 { + return nil, errors.New("GetLogsWrapper maximum recursion level") + } + + logs, err := a.GetLogs(fromBlock, toBlock, contracts) + if err != nil { + a.logger.Error(err) + allLogs := make([]any, 0) + parts := int64(7) + if toBlock-fromBlock < parts { + return nil, errors.WithMessage(err, "can not split getLogs to parts") + } + + step, _ := math.Modf(float64((toBlock - fromBlock) / parts)) + for i := 1; i <= int(parts); i++ { + + from := int64(fromBlock) + int64(i-1)*int64(step) + to := int64(fromBlock) + int64(i)*int64(step) - 1 + if int64(i) == parts { + to = toBlock + } + + l, err := a.GetLogsRecursively(from, to, contracts, level+1) + if err != nil { + return nil, errors.WithMessage(err, "recursive call error") + } + a.logger.Debugln(i, from, to) + allLogs = append(allLogs, l...) + } + + return allLogs, nil + } + + a.logger.Debugln("RETURN: ", fromBlock, toBlock, len(logs)) + return logs, err + +} + +func (a *ArbitrumNovaAdapter) GetLogs(fromBlock, toBlock int64, contracts []common.Address) ([]any, error) { + + if contracts == nil { + contracts = a.contracts.AllAddresses + } + + query := ethereum.FilterQuery{ + FromBlock: big.NewInt(fromBlock), + ToBlock: big.NewInt(toBlock), + Addresses: contracts, + } + + bcLogs, err := a.FilterLogs(context.TODO(), query) + if err != nil { + return nil, errors.WithMessage(err, "failed to filter log") + } + + logs := make([]any, 0) + + logTransferSig := []byte("Transfer(address,address,uint256)") + logTransferSigHash := crypto.Keccak256Hash(logTransferSig) + + logStakeSigHash := crypto.Keccak256Hash([]byte(a.contracts.StakeABI.Events["Stake"].Sig)) + logUnstakeSigHash := a.contracts.StakeABI.Events["Unstake"].ID + logRestakeSigHash := a.contracts.StakeABI.Events["Restake"].ID + + logTransferNftHash := a.contracts.NftABI.Events["Transfer"].ID + + logTransferOdysseySig := []byte("NodeMngmtEvent(uint256,uint256,uint256)") + logTransferOdysseySigHash := crypto.Keccak256Hash(logTransferOdysseySig) + + for _, vLog := range bcLogs { + //fmt.Printf("Log Block Number: %d\n", vLog.BlockNumber) + //fmt.Printf("Log Index: %d\n", vLog.Index) + + // Iterate contracts + switch vLog.Address.Hex() { + case a.contracts.momTokenAddress.Hex(), a.contracts.dadTokenAddress.Hex(): + switch vLog.Topics[0].Hex() { + case logTransferSigHash.Hex(): + //fmt.Printf("Log Name: Transfer\n") + + //var transferEvent contracter.BCDiff + + var e contracter.TransferERC20Log + + ev, err := a.contracts.TokenABI.Unpack("Transfer", vLog.Data) + if err != nil { + return nil, errors.WithMessage(err, "failed to unpack event from ABI") + } + + e.Contract = strings.ToLower(vLog.Address.Hex()) + // Hex and Un Hex here used to remove padding zeros + e.From = strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) + e.To = strings.ToLower(common.HexToAddress(vLog.Topics[2].Hex()).Hex()) + if len(ev) > 0 { + e.Value = ev[0].(*big.Int) + } + + logs = append(logs, &e) + } + + case a.contracts.stakeAddress.Hex(): + switch vLog.Topics[0].Hex() { + + case logStakeSigHash.Hex(): + ev, err := a.contracts.StakeABI.Unpack("Stake", vLog.Data) + if err != nil { + return nil, errors.WithMessage(err, "failed to unpack event from ABI") + } + + // Hack to remove extra zeroes + fromWallet := common.HexToAddress(vLog.Topics[1].Hex()) + + b := vLog.Topics[2].Bytes()[16:] + odysseyID, err := umid.FromBytes(b) + if err != nil { + return nil, errors.WithMessage(err, "failed to parse umid from bytes") + } + if odysseyID == umid.MustParse("ccccaaaa-1111-2222-3333-222222222222") || + odysseyID == umid.MustParse("ccccaaaa-1111-2222-3333-222222222244") || + odysseyID == umid.MustParse("ccccaaaa-1111-2222-3333-222222222241") { + // Skip test Odyssey IDs + continue + } + + transactionHash := vLog.TxHash.Hex() + amount := ev[0].(*big.Int) + tokenType := ev[1].(uint8) + totalAmount := ev[2].(*big.Int) + + e := &contracter.StakeLog{ + TxHash: transactionHash, + LogIndex: vLog.Index, + UserWallet: fromWallet.Hex(), + OdysseyID: odysseyID, + AmountStaked: amount, + TokenType: tokenType, + TotalStaked: totalAmount, + } + + logs = append(logs, e) + + case logUnstakeSigHash.Hex(): + ev, err := a.contracts.StakeABI.Unpack("Unstake", vLog.Data) + if err != nil { + return nil, errors.WithMessage(err, "failed to unpack event from ABI") + } + + // Hack to remove extra zeroes + fromWallet := common.HexToAddress(vLog.Topics[1].Hex()) + + b1 := vLog.Topics[2].Bytes() + b2 := b1[16:] + odysseyID, err := umid.FromBytes(b2) + if err != nil { + return nil, errors.WithMessage(err, "failed to parse umid from bytes") + } + + transactionHash := vLog.TxHash.Hex() + amount := ev[0].(*big.Int) + tokenType := ev[1].(uint8) + totalAmount := ev[2].(*big.Int) + + e := &contracter.UnstakeLog{ + TxHash: transactionHash, + LogIndex: vLog.Index, + UserWallet: fromWallet.Hex(), + OdysseyID: odysseyID, + AmountUnstaked: amount, + TokenType: tokenType, + TotalStaked: totalAmount, + } + + logs = append(logs, e) + + case logRestakeSigHash.Hex(): + a.logger.Debugln("Restake") + } + case a.contracts.nftAddress.Hex(): + a.logger.Debugln("NFT") + + switch vLog.Topics[0].Hex() { + case logTransferNftHash.Hex(): + // TODO Not sure why vLog.Data is empty + //ev, err := a.contracts.NftABI.Unpack("Transfer", vLog.Data) + //if err != nil { + // return nil, errors.WithMessage(err, "failed to unpack event from ABI") + //} + + from := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) + to := strings.ToLower(common.HexToAddress(vLog.Topics[2].Hex()).Hex()) + itemID := vLog.Topics[3].Big() + + var id umid.UMID + itemID.FillBytes(id[:]) + + if err != nil { + return nil, errors.WithMessage(err, "failed to read umid from bytes") + } + + e := &contracter.TransferNFTLog{ + From: from, + To: to, + TokenID: id, + Contract: strings.ToLower(vLog.Address.Hex()), + } + + logs = append(logs, e) + } + case a.contracts.nodeAddress.Hex(): + switch vLog.Topics[0].Hex() { + case logTransferOdysseySigHash.Hex(): + + var fromID, toID, odysseyID umid.UMID + vLog.Topics[1].Big().FillBytes(fromID[:]) + vLog.Topics[2].Big().FillBytes(toID[:]) + vLog.Topics[3].Big().FillBytes(odysseyID[:]) + + e := &contracter.TransferOdysseyLog{ + FromNodeID: fromID, + ToNodeID: toID, + OdysseyID: odysseyID, + } + + logs = append(logs, e) + } + } + } + + return logs, nil +} + +func (a *ArbitrumNovaAdapter) GetInfo() (umid umid.UMID, name string, rpcURL string) { + return a.umid, a.name, a.wsURL +} + +func (a *ArbitrumNovaAdapter) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + var result []types.Log + arg, err := toFilterArg(q) + if err != nil { + return nil, err + } + err = a.rpcClient.CallContext(ctx, &result, "eth_getLogs", arg) + return result, err +} + +func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { + arg := map[string]interface{}{ + "address": q.Addresses, + "topics": q.Topics, + } + if q.BlockHash != nil { + arg["blockHash"] = *q.BlockHash + if q.FromBlock != nil || q.ToBlock != nil { + return nil, fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock") + } + } else { + if q.FromBlock == nil { + arg["fromBlock"] = "0x0" + } else { + arg["fromBlock"] = toBlockNumArg(q.FromBlock) + } + arg["toBlock"] = toBlockNumArg(q.ToBlock) + } + return arg, nil +} + +func toBlockNumArg(number *big.Int) string { + if number == nil { + return "latest" + } + pending := big.NewInt(-1) + if number.Cmp(pending) == 0 { + return "pending" + } + finalized := big.NewInt(int64(rpc.FinalizedBlockNumber)) + if number.Cmp(finalized) == 0 { + return "finalized" + } + safe := big.NewInt(int64(rpc.SafeBlockNumber)) + if number.Cmp(safe) == 0 { + return "safe" + } + return hexutil.EncodeBig(number) +} + +func hex2int(hexStr string) uint64 { + // remove 0x suffix if found in the input string + cleaned := strings.Replace(hexStr, "0x", "", -1) + + // base 16 for hexadecimal + result, _ := strconv.ParseUint(cleaned, 16, 64) + return uint64(result) +} diff --git a/harvester/arbitrum_nova_adapter/contracts.go b/contracter/arbitrum_nova_adapter/contracts.go similarity index 90% rename from harvester/arbitrum_nova_adapter/contracts.go rename to contracter/arbitrum_nova_adapter/contracts.go index e145a9c3..6ff3471f 100644 --- a/harvester/arbitrum_nova_adapter/contracts.go +++ b/contracter/arbitrum_nova_adapter/contracts.go @@ -1,11 +1,10 @@ package arbitrum_nova_adapter import ( + _ "embed" "log" "strings" - _ "embed" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -30,6 +29,7 @@ type Contracts struct { dadTokenAddress common.Address stakeAddress common.Address nftAddress common.Address + nodeAddress common.Address AllAddresses []common.Address } @@ -59,11 +59,12 @@ func NewContracts(cfg *config.Arbitrum) *Contracts { dadTokenAddress: common.HexToAddress(cfg.DADTokenAddress), stakeAddress: common.HexToAddress(cfg.StakeAddress), nftAddress: common.HexToAddress(cfg.NFTAddress), + nodeAddress: common.HexToAddress(cfg.NodeAddress), } allAddresses := make([]common.Address, 0) allAddresses = append(allAddresses, - contracts.momTokenAddress, contracts.dadTokenAddress, contracts.stakeAddress, contracts.nftAddress) + contracts.momTokenAddress, contracts.dadTokenAddress, contracts.stakeAddress, contracts.nftAddress, contracts.nodeAddress) contracts.AllAddresses = allAddresses return contracts diff --git a/cmd/harvester/adapter/main.go b/contracter/cmd/adapter/main.go similarity index 85% rename from cmd/harvester/adapter/main.go rename to contracter/cmd/adapter/main.go index 13d8c1f1..6421b240 100644 --- a/cmd/harvester/adapter/main.go +++ b/contracter/cmd/adapter/main.go @@ -7,15 +7,15 @@ import ( "github.com/ethereum/go-ethereum/common" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/ethereum_adapter" + "github.com/momentum-xyz/ubercontroller/contracter" + "github.com/momentum-xyz/ubercontroller/contracter/ethereum_adapter" ) func main() { a := ethereum_adapter.NewEthereumAdapter() a.Run() - var l harvester.AdapterListener - l = func(blockNumber uint64, diffs []*harvester.BCDiff) { + var l contracter.AdapterListener + l = func(blockNumber uint64, diffs []*contracter.BCDiff, stakes []*contracter.BCStake) { fmt.Printf("Listener: %+v \n", blockNumber) for k, v := range diffs { fmt.Printf("%+v %+v %+v %+v\n", k, v.To, v.Token, v.Amount) diff --git a/cmd/harvester/arbitrum_nova/main.go b/contracter/cmd/arbitrum_nova/main.go similarity index 73% rename from cmd/harvester/arbitrum_nova/main.go rename to contracter/cmd/arbitrum_nova/main.go index ddbfd073..d6e741f7 100644 --- a/cmd/harvester/arbitrum_nova/main.go +++ b/contracter/cmd/arbitrum_nova/main.go @@ -9,8 +9,8 @@ import ( "go.uber.org/zap" "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + "github.com/momentum-xyz/ubercontroller/contracter" + "github.com/momentum-xyz/ubercontroller/contracter/arbitrum_nova_adapter" ) func main() { @@ -30,8 +30,8 @@ func main() { fmt.Printf("Last Block: %+v \n", n) - var l harvester.AdapterListener - l = func(blockNumber uint64, diffs []*harvester.BCDiff, stakes []*harvester.BCStake) { + var l contracter.AdapterListener + l = func(blockNumber uint64, diffs []*contracter.BCDiff, stakes []*contracter.BCStake) { fmt.Printf("Listener: %+v \n", blockNumber) for k, v := range diffs { fmt.Printf("%+v %+v %+v %+v\n", k, v.To, v.Token, v.Amount) @@ -62,23 +62,23 @@ func main() { for _, log := range logs { switch log.(type) { - case *harvester.TransferERC20Log: - l := log.(*harvester.TransferERC20Log) + case *contracter.TransferERC20Log: + l := log.(*contracter.TransferERC20Log) fmt.Printf("%s %s %s \n", l.From, l.To, l.Value) - //fmt.Println(log.(*harvester.TransferERC20Log).Value) + //fmt.Println(log.(*contracter.TransferERC20Log).Value) } } for _, log := range logs { switch log.(type) { - case *harvester.StakeLog: - l := log.(*harvester.StakeLog) + case *contracter.StakeLog: + l := log.(*contracter.StakeLog) fmt.Printf(" stake: %s %s %s %d %s %s \n", l.TxHash, l.UserWallet, l.OdysseyID, l.TokenType, l.AmountStaked, l.TotalStaked) - //fmt.Println(log.(*harvester.TransferERC20Log).Value) - case *harvester.UnstakeLog: - l := log.(*harvester.UnstakeLog) + //fmt.Println(log.(*contracter.TransferERC20Log).Value) + case *contracter.UnstakeLog: + l := log.(*contracter.UnstakeLog) fmt.Printf("unstake: %s %s %s %s \n", l.UserWallet, l.OdysseyID, l.AmountUnstaked, l.TotalStaked) - //fmt.Println(log.(*harvester.TransferERC20Log).Value) + //fmt.Println(log.(*contracter.TransferERC20Log).Value) } } diff --git a/cmd/harvester/get_logs_recursively/main.go b/contracter/cmd/get_logs_recursively/main.go similarity index 75% rename from cmd/harvester/get_logs_recursively/main.go rename to contracter/cmd/get_logs_recursively/main.go index f16e855f..b734793e 100644 --- a/cmd/harvester/get_logs_recursively/main.go +++ b/contracter/cmd/get_logs_recursively/main.go @@ -10,8 +10,8 @@ import ( "go.uber.org/zap" "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + "github.com/momentum-xyz/ubercontroller/contracter" + "github.com/momentum-xyz/ubercontroller/contracter/arbitrum_nova_adapter" ) func main() { @@ -47,24 +47,24 @@ func main() { for _, log := range logs { switch log.(type) { - case *harvester.TransferERC20Log: - l := log.(*harvester.TransferERC20Log) + case *contracter.TransferERC20Log: + l := log.(*contracter.TransferERC20Log) fmt.Printf("%s %s %s \n", l.From, l.To, l.Value) - //fmt.Println(log.(*harvester.TransferERC20Log).Value) + //fmt.Println(log.(*contracter.TransferERC20Log).Value) } } log.Println("LOGS:" + strconv.Itoa(len(logs))) for _, log := range logs { switch log.(type) { - case *harvester.StakeLog: - l := log.(*harvester.StakeLog) + case *contracter.StakeLog: + l := log.(*contracter.StakeLog) fmt.Printf(" stake: %s %s %s %d %s %s \n", l.TxHash, l.UserWallet, l.OdysseyID, l.TokenType, l.AmountStaked, l.TotalStaked) - //fmt.Println(log.(*harvester.TransferERC20Log).Value) - case *harvester.UnstakeLog: - l := log.(*harvester.UnstakeLog) + //fmt.Println(log.(*contracter.TransferERC20Log).Value) + case *contracter.UnstakeLog: + l := log.(*contracter.UnstakeLog) fmt.Printf("unstake: %s %s %s %s \n", l.UserWallet, l.OdysseyID, l.AmountUnstaked, l.TotalStaked) - //fmt.Println(log.(*harvester.TransferERC20Log).Value) + //fmt.Println(log.(*contracter.TransferERC20Log).Value) } } diff --git a/cmd/harvester/table2/main.go b/contracter/cmd/table/main.go similarity index 70% rename from cmd/harvester/table2/main.go rename to contracter/cmd/table/main.go index 309d5aa4..94bde0c3 100644 --- a/cmd/harvester/table2/main.go +++ b/contracter/cmd/table/main.go @@ -10,8 +10,8 @@ import ( "go.uber.org/zap" "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + "github.com/momentum-xyz/ubercontroller/contracter" + "github.com/momentum-xyz/ubercontroller/contracter/arbitrum_nova_adapter" ) func main() { @@ -35,13 +35,13 @@ func main() { a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(cfg, logger.Sugar()) a.Run() - t := harvester.NewTable2(pool, a, listener) + t := contracter.NewTable(pool, a, listener) t.Run() time.Sleep(time.Hour) } -func listener(bcName string, events []*harvester.UpdateEvent, stakeEvents []*harvester.StakeEvent, nftEvent []*harvester.NftEvent) error { +func listener(bcName string, events []*contracter.UpdateEvent, stakeEvents []*contracter.StakeEvent, nftEvent []*contracter.NftEvent, transferEvents []*contracter.TransferOdysseyEvent) error { fmt.Printf("Table Listener: \n") for k, v := range events { fmt.Printf("%+v %+v %+v %+v \n", k, v.Wallet, v.Contract, v.Amount.String()) diff --git a/harvester/ethereum_adapter/ethereum_adapter.go b/contracter/ethereum_adapter/ethereum_adapter.go similarity index 96% rename from harvester/ethereum_adapter/ethereum_adapter.go rename to contracter/ethereum_adapter/ethereum_adapter.go index a6502058..b24a2129 100644 --- a/harvester/ethereum_adapter/ethereum_adapter.go +++ b/contracter/ethereum_adapter/ethereum_adapter.go @@ -19,12 +19,12 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" - "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/contracter" "github.com/momentum-xyz/ubercontroller/utils/umid" ) type EthereumAdapter struct { - listener harvester.AdapterListener + listener contracter.AdapterListener umid umid.UMID rpcURL string httpURL string @@ -52,7 +52,7 @@ func (a *EthereumAdapter) GetInfo() (umid umid.UMID, name string, rpcURL string) return a.umid, a.name, a.rpcURL } -func (a *EthereumAdapter) RegisterNewBlockListener(f harvester.AdapterListener) { +func (a *EthereumAdapter) RegisterNewBlockListener(f contracter.AdapterListener) { a.listener = f } @@ -61,7 +61,7 @@ func (a *EthereumAdapter) GetLastBlockNumber() (uint64, error) { return number, err } -func (a *EthereumAdapter) GetLogs(fromBlock, toBlock int64, addresses []common.Address) ([]*harvester.BCDiff, []*harvester.BCStake, error) { +func (a *EthereumAdapter) GetLogs(fromBlock, toBlock int64, addresses []common.Address) ([]*contracter.BCDiff, []*contracter.BCStake, error) { query := ethereum.FilterQuery{ FromBlock: big.NewInt(fromBlock), @@ -81,8 +81,8 @@ func (a *EthereumAdapter) GetLogs(fromBlock, toBlock int64, addresses []common.A //fmt.Println(logStakeSigHash) //fmt.Println(a.stakeContractABI.Events["Stake"].ID.Hex()) - diffs := make([]*harvester.BCDiff, 0) - stakes := make([]*harvester.BCStake, 0) + diffs := make([]*contracter.BCDiff, 0) + stakes := make([]*contracter.BCStake, 0) //fmt.Println(a.contractABI.Events) @@ -94,7 +94,7 @@ func (a *EthereumAdapter) GetLogs(fromBlock, toBlock int64, addresses []common.A case logTransferSigHash.Hex(): //fmt.Printf("Log Name: Transfer\n") - var transferEvent harvester.BCDiff + var transferEvent contracter.BCDiff ev, err := a.contractABI.Unpack("Transfer", vLog.Data) if err != nil { @@ -138,7 +138,7 @@ func (a *EthereumAdapter) GetLogs(fromBlock, toBlock int64, addresses []common.A tokenType := ev[3].(uint8) - stake := &harvester.BCStake{ + stake := &contracter.BCStake{ From: fromWallet.Hex(), OdysseyID: odysseyID, TokenType: tokenType, @@ -231,7 +231,7 @@ func (a *EthereumAdapter) Run() { //fmt.Println(vLog.TxHash) //fmt.Println(vLog.Hash()) - block := &harvester.BCBlock{ + block := &contracter.BCBlock{ Hash: vLog.Hash().String(), } @@ -247,14 +247,14 @@ func (a *EthereumAdapter) Run() { }() } -func (a *EthereumAdapter) onNewBlock(b *harvester.BCBlock) { +func (a *EthereumAdapter) onNewBlock(b *contracter.BCBlock) { block, err := a.client.BlockByNumber(context.TODO(), big.NewInt(int64(b.Number))) if err != nil { err = errors.WithMessage(err, "failed to get block by number") fmt.Println(err) } - diffs := make([]*harvester.BCDiff, 0) + diffs := make([]*contracter.BCDiff, 0) for _, tx := range block.Transactions() { //fmt.Println(tx.Hash().Hex()) @@ -273,7 +273,7 @@ func (a *EthereumAdapter) onNewBlock(b *harvester.BCBlock) { } if methodName == "transfer" { - diff := &harvester.BCDiff{} + diff := &contracter.BCDiff{} diff.From = strings.ToLower(a.GetTransactionMessage(tx).From.Hex()) diff.To = strings.ToLower(methodInput["_to"].(common.Address).Hex()) @@ -283,7 +283,7 @@ func (a *EthereumAdapter) onNewBlock(b *harvester.BCBlock) { } if methodName == "transferFrom" { - diff := &harvester.BCDiff{} + diff := &contracter.BCDiff{} diff.From = strings.ToLower(methodInput["_from"].(common.Address).Hex()) diff.To = strings.ToLower(methodInput["_to"].(common.Address).Hex()) diff.Token = strings.ToLower(tx.To().Hex()) @@ -305,7 +305,7 @@ func (a *EthereumAdapter) onNewBlock(b *harvester.BCBlock) { ////amount.SetString("33190774000000000000000", 10) //amount.SetString("1", 10) // - //mockDiffs := []*harvester.BCDiff{ + //mockDiffs := []*contracter.BCDiff{ // { // From: "0x2813fd17ea95b2655a7228383c5236e31090419e", // To: "0x3f363b4e038a6e43ce8321c50f3efbf460196d4b", @@ -314,7 +314,7 @@ func (a *EthereumAdapter) onNewBlock(b *harvester.BCBlock) { // }, //} - stakes := make([]*harvester.BCStake, 0) + stakes := make([]*contracter.BCStake, 0) a.listener(b.Number, diffs, stakes) } diff --git a/contracter/interfaces.go b/contracter/interfaces.go new file mode 100644 index 00000000..bd90bd1e --- /dev/null +++ b/contracter/interfaces.go @@ -0,0 +1,164 @@ +package contracter + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + + "github.com/momentum-xyz/ubercontroller/utils/umid" +) + +type TransferERC20Log struct { + From string + To string + Value *big.Int + Contract string +} + +/** + * + * @param user User address + * @param odyssey Odyssey ID that's being staked + * @param amount_staked Amount being staked + * @param token Token used (MOM or DAD) + * @param total_staked Total being staked by the user on the Odyssey + */ +//event Stake(address user, bytes16 odyssey, uint256 amount_staked, Token token, uint256 total_staked); +type StakeLog struct { + LogIndex uint + TxHash string + UserWallet string + OdysseyID umid.UMID + AmountStaked *big.Int + TokenType uint8 + TotalStaked *big.Int +} + +/** + * + * @param user User address + * @param odyssey Odyssey ID that's being unstaked + * @param amount_unstaked Amount unstaked + * @param token Token used (MOM or DAD) + * @param total_staked Total remained staked by the user on that Odyssey + */ +//event Unstake(address user, bytes16 odyssey, uint256 amount_unstaked, Token token, uint256 total_staked); +type UnstakeLog struct { + LogIndex uint + TxHash string + UserWallet string + OdysseyID umid.UMID + AmountUnstaked *big.Int + TokenType uint8 + TotalStaked *big.Int +} + +/** + * + * @param user User address + * @param odyssey_from Odyssey ID that the user is removing stake + * @param odyssey_to Odyssey ID that the user is staking into + * @param amount Amount that's being restaked + * @param token Token used (MOM or DAD) + * @param total_staked_from Total amount of tokens that remains staked on the `odyssey_from` + * @param total_staked_to Total amount of tokens staked on `odyssey_to` + */ +//event Restake(address user, +//bytes16 odyssey_from, +//bytes16 odyssey_to, +//uint256 amount, +//Token token, +//uint256 total_staked_from, +//uint256 total_staked_to); +type RestakeLog struct { + UserWallet string + FromOdysseyID umid.UMID + ToOdysseyID umid.UMID + Amount *big.Int + TokenType uint8 + TotalStakedToFrom *big.Int + TotalStakedToTo *big.Int +} + +/** + * @dev Emitted when `tokenId` token is transferred from `from` to `to`. + */ +//event Transfer(address indexed from, address indexed to, uint256 indexed tokenId); +type TransferNFTLog struct { + From string + To string + TokenID umid.UMID + Contract string +} + +type TransferOdysseyLog struct { + FromNodeID umid.UMID + ToNodeID umid.UMID + OdysseyID umid.UMID +} + +type BCBlock struct { + Hash string + Number uint64 +} + +type BCDiff struct { + From string + To string + Token string + Amount *big.Int +} + +type BCStake struct { + From string + OdysseyID umid.UMID + TokenType uint8 //0-MOM; 1-DAD + Amount *big.Int + TotalAmount *big.Int +} + +type UpdateEvent struct { + Wallet string + Contract string + Amount *big.Int +} + +type StakeEvent struct { + TxHash string + LogIndex string + Wallet string + Kind uint8 + OdysseyID umid.UMID + Amount *big.Int + ActivityType string + //CreatedAt time.Time +} + +type NftEvent struct { + From string + To string + OdysseyID umid.UMID + Contract string +} + +type TransferOdysseyEvent struct { + FromNodeID umid.UMID + ToNodeID umid.UMID + OdysseyID umid.UMID +} + +type AdapterListener func(blockNumber uint64, diffs []*BCDiff, stakes []*BCStake) + +type Adapter interface { + GetLastBlockNumber() (uint64, error) + GetBalance(wallet string, contract string, blockNumber uint64) (*big.Int, error) + GetLogs(fromBlock, toBlock int64, addresses []common.Address) ([]any, error) + GetLogsRecursively(fromBlock, toBlock int64, addresses []common.Address, level int) ([]any, error) + RegisterNewBlockListener(f AdapterListener) + Run() + GetInfo() (umid umid.UMID, name string, rpcURL string) +} + +type BCType string + +type Event string diff --git a/harvester/table2.go b/contracter/table.go similarity index 87% rename from harvester/table2.go rename to contracter/table.go index 2c0ec5c6..caa7ba9f 100644 --- a/harvester/table2.go +++ b/contracter/table.go @@ -1,7 +1,8 @@ -package harvester +package contracter import ( "context" + "encoding/hex" "fmt" "log" "math/big" @@ -18,11 +19,13 @@ import ( ) /** -Table2 features: +Table features: - Always load data from Arbitrum from block 0 on every UC start */ -type Table2 struct { +type Address []byte + +type Table struct { mu deadlock.RWMutex blockNumber uint64 data map[string]map[string]*big.Int @@ -30,11 +33,11 @@ type Table2 struct { nftData map[umid.UMID]string db *pgxpool.Pool adapter Adapter - harvesterListener func(bcName string, p []*UpdateEvent, s []*StakeEvent, n []*NftEvent) error + harvesterListener func(bcName string, p []*UpdateEvent, s []*StakeEvent, n []*NftEvent, t []*TransferOdysseyEvent) error } -func NewTable2(db *pgxpool.Pool, adapter Adapter, listener func(bcName string, p []*UpdateEvent, s []*StakeEvent, n []*NftEvent) error) *Table2 { - return &Table2{ +func NewTable(db *pgxpool.Pool, adapter Adapter, listener func(bcName string, p []*UpdateEvent, s []*StakeEvent, n []*NftEvent, t []*TransferOdysseyEvent) error) *Table { + return &Table{ blockNumber: 0, data: make(map[string]map[string]*big.Int), stakesData: make(map[umid.UMID]map[string]map[uint8]*big.Int), @@ -45,13 +48,13 @@ func NewTable2(db *pgxpool.Pool, adapter Adapter, listener func(bcName string, p } } -func (t *Table2) Run() { +func (t *Table) Run() { t.fastForward() t.adapter.RegisterNewBlockListener(t.listener) } -func (t *Table2) fastForward() { +func (t *Table) fastForward() { t.mu.Lock() defer t.mu.Unlock() @@ -95,11 +98,12 @@ func (t *Table2) fastForward() { t.ProcessLogs(lastBlockNumber, logs) } -func (t *Table2) ProcessLogs(blockNumber uint64, logs []any) { +func (t *Table) ProcessLogs(blockNumber uint64, logs []any) { fmt.Printf("Block: %d \n", blockNumber) events := make([]*UpdateEvent, 0) stakeEvents := make([]*StakeEvent, 0) nftEvents := make([]*NftEvent, 0) + transferEvents := make([]*TransferOdysseyEvent, 0) nftLogs := make([]*TransferNFTLog, 0) stakeLogs := make([]*StakeLog, 0) @@ -111,7 +115,7 @@ func (t *Table2) ProcessLogs(blockNumber uint64, logs []any) { _, ok := t.data[diff.Contract] if !ok { - // Table2 store everything came from adapter + // Table store everything came from adapter t.data[diff.Contract] = make(map[string]*big.Int) } @@ -200,6 +204,14 @@ func (t *Table2) ProcessLogs(blockNumber uint64, logs []any) { OdysseyID: e.TokenID, }) nftLogs = append(nftLogs, e) + + case *TransferOdysseyLog: + e := log.(*TransferOdysseyLog) + transferEvents = append(transferEvents, &TransferOdysseyEvent{ + FromNodeID: e.FromNodeID, + ToNodeID: e.ToNodeID, + OdysseyID: e.OdysseyID, + }) } } @@ -207,7 +219,7 @@ func (t *Table2) ProcessLogs(blockNumber uint64, logs []any) { t.blockNumber = blockNumber _, name, _ := t.adapter.GetInfo() - if err := t.harvesterListener(name, events, stakeEvents, nftEvents); err != nil { + if err := t.harvesterListener(name, events, stakeEvents, nftEvents, transferEvents); err != nil { log.Printf("Error in harvester listener: %v\n", err) } @@ -232,14 +244,14 @@ func createIfEmpty(m map[umid.UMID]map[string]map[uint8]*big.Int, odysseyID umid } } -func (t *Table2) listener(blockNumber uint64, diffs []*BCDiff, stakes []*BCStake) { +func (t *Table) listener(blockNumber uint64, diffs []*BCDiff, stakes []*BCStake) { t.fastForward() //t.mu.Lock() //t.ProcessDiffs(blockNumber, diffs, stakes) //t.mu.Unlock() } -func (t *Table2) SaveToDB(events []*UpdateEvent, stakeEvents []*StakeEvent, nftLogs []*TransferNFTLog) (err error) { +func (t *Table) SaveToDB(events []*UpdateEvent, stakeEvents []*StakeEvent, nftLogs []*TransferNFTLog) (err error) { wallets := make([]Address, 0) contracts := make([]Address, 0) // Save balance by value to quickly unlock mutex, otherwise have to unlock util DB transaction finished @@ -289,7 +301,7 @@ func (t *Table2) SaveToDB(events []*UpdateEvent, stakeEvents []*StakeEvent, nftL return t.saveToDB(wallets, contracts, balances, stakes, nftLogs) } -func (t *Table2) saveToDB(wallets []Address, contracts []Address, balances []*entry.Balance, stakes []*entry.Stake, nftLogs []*TransferNFTLog) error { +func (t *Table) saveToDB(wallets []Address, contracts []Address, balances []*entry.Balance, stakes []*entry.Stake, nftLogs []*TransferNFTLog) error { blockchainUMID, name, rpcURL := t.adapter.GetInfo() tx, err := t.db.BeginTx(context.Background(), pgx.TxOptions{}) @@ -410,7 +422,7 @@ func (t *Table2) saveToDB(wallets []Address, contracts []Address, balances []*en return nil } -func (t *Table2) GetLastCommentByTxHash(txHash string) (string, error) { +func (t *Table) GetLastCommentByTxHash(txHash string) (string, error) { sqlQuery := `SELECT comment FROM pending_stake WHERE transaction_id = $1` row := t.db.QueryRow(context.TODO(), sqlQuery, HexToAddress(txHash)) @@ -427,15 +439,7 @@ func (t *Table2) GetLastCommentByTxHash(txHash string) (string, error) { return comment, err } -func (t *Table2) LoadFromDB() error { - panic("not implemented") -} - -func (t *Table2) AddWalletContract(wallet string, contract string) { - panic("not implemented") -} - -func (t *Table2) Display() { +func (t *Table) Display() { fmt.Println("Display:") for token, wallets := range t.data { for wallet, balance := range wallets { @@ -443,3 +447,24 @@ func (t *Table2) Display() { } } } + +func HexToAddress(s string) []byte { + b, err := hex.DecodeString(s[2:]) + if err != nil { + panic(err) + } + return b +} + +func unique(slice []Address) []Address { + keys := make(map[string]bool) + list := []Address{} + for _, entry := range slice { + entryStr := hex.EncodeToString(entry) + if _, value := keys[entryStr]; !value { + keys[entryStr] = true + list = append(list, entry) + } + } + return list +} diff --git a/harvester3/DB.go b/harvester/DB.go similarity index 99% rename from harvester3/DB.go rename to harvester/DB.go index 62c2237a..7ca299c8 100644 --- a/harvester3/DB.go +++ b/harvester/DB.go @@ -1,4 +1,4 @@ -package harvester3 +package harvester import ( "context" diff --git a/harvester3/Ethers.go b/harvester/Ethers.go similarity index 99% rename from harvester3/Ethers.go rename to harvester/Ethers.go index 4c6e11d8..e0c9a457 100644 --- a/harvester3/Ethers.go +++ b/harvester/Ethers.go @@ -1,4 +1,4 @@ -package harvester3 +package harvester import ( "math/big" diff --git a/harvester3/Harvester.go b/harvester/Harvester.go similarity index 98% rename from harvester3/Harvester.go rename to harvester/Harvester.go index 67e05426..209a3d91 100644 --- a/harvester3/Harvester.go +++ b/harvester/Harvester.go @@ -1,4 +1,4 @@ -package harvester3 +package harvester import ( "math/big" diff --git a/harvester3/NFTs.go b/harvester/NFTs.go similarity index 99% rename from harvester3/NFTs.go rename to harvester/NFTs.go index d33bbf7b..250b49c4 100644 --- a/harvester3/NFTs.go +++ b/harvester/NFTs.go @@ -1,4 +1,4 @@ -package harvester3 +package harvester import ( "math/big" diff --git a/harvester3/SubscribeQueue.go b/harvester/SubscribeQueue.go similarity index 99% rename from harvester3/SubscribeQueue.go rename to harvester/SubscribeQueue.go index 0fb9c54c..64fc14ac 100644 --- a/harvester3/SubscribeQueue.go +++ b/harvester/SubscribeQueue.go @@ -1,4 +1,4 @@ -package harvester3 +package harvester import ( "github.com/ethereum/go-ethereum/common" diff --git a/harvester3/Tokens.go b/harvester/Tokens.go similarity index 99% rename from harvester3/Tokens.go rename to harvester/Tokens.go index e1cf234b..e1503e1f 100644 --- a/harvester3/Tokens.go +++ b/harvester/Tokens.go @@ -1,4 +1,4 @@ -package harvester3 +package harvester import ( "math/big" diff --git a/harvester/arbitrum_nova_adapter/arbitrum_nova_adapter.go b/harvester/arbitrum_nova_adapter/arbitrum_nova_adapter.go index 5eafc5d9..a081181d 100644 --- a/harvester/arbitrum_nova_adapter/arbitrum_nova_adapter.go +++ b/harvester/arbitrum_nova_adapter/arbitrum_nova_adapter.go @@ -2,15 +2,16 @@ package arbitrum_nova_adapter import ( "context" + "encoding/json" "fmt" "log" "math" "math/big" "strconv" "strings" + "sync" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -20,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/momentum-xyz/ubercontroller/config" "github.com/momentum-xyz/ubercontroller/harvester" @@ -27,26 +29,24 @@ import ( ) type ArbitrumNovaAdapter struct { - logger *zap.SugaredLogger - listener harvester.AdapterListener - umid umid.UMID - wsURL string - httpURL string - name string - //client *ethclient.Client + listeners []func(blockNumber uint64) + umid umid.UMID + httpURL string + name string rpcClient *rpc.Client lastBlock uint64 - contracts *Contracts + + logger *zap.SugaredLogger } -func NewArbitrumNovaAdapter(cfg *config.Config, logger *zap.SugaredLogger) *ArbitrumNovaAdapter { +func NewArbitrumNovaAdapter(cfg *config.Arbitrum3, logger *zap.SugaredLogger) *ArbitrumNovaAdapter { return &ArbitrumNovaAdapter{ - logger: logger, umid: umid.MustParse("ccccaaaa-1111-2222-3333-222222222222"), - wsURL: cfg.Arbitrum.WSURL, - httpURL: cfg.Arbitrum.RPCURL, + httpURL: cfg.RPCURL, name: "arbitrum_nova", - contracts: NewContracts(&cfg.Arbitrum), + listeners: make([]func(blockNumber uint64), 0), + + logger: logger, } } @@ -63,11 +63,10 @@ func (a *ArbitrumNovaAdapter) Run() { var err error a.rpcClient, err = rpc.DialHTTP(a.httpURL) if err != nil { - log.Fatal(err) + a.logger.Error(err) } - a.logger.Debugf("Connected to Arbitrum Block Chain: %s", a.httpURL) - /////// + a.logger.Info("Connected to Arbitrum Block Chain: " + a.httpURL) ticker := time.NewTicker(1000 * time.Millisecond) done := make(chan bool) @@ -78,15 +77,14 @@ func (a *ArbitrumNovaAdapter) Run() { return case t := <-ticker.C: _ = t - //fmt.Println("Tick at", t) n, err := a.GetLastBlockNumber() if err != nil { a.logger.Error(err) } if a.lastBlock < n { a.lastBlock = n - if a.listener != nil { - a.listener(n, nil, nil) + for _, listener := range a.listeners { + listener(n) } } } @@ -95,37 +93,45 @@ func (a *ArbitrumNovaAdapter) Run() { } func (a *ArbitrumNovaAdapter) RegisterNewBlockListener(f harvester.AdapterListener) { - a.listener = f + a.listeners = append(a.listeners, f) } -func (a *ArbitrumNovaAdapter) GetBalance(wallet string, contract string, blockNumber uint64) (*big.Int, error) { +func (a *ArbitrumNovaAdapter) GetTokenBalance(contract *common.Address, wallet *common.Address, blockNumber uint64) (*big.Int, uint64, error) { type request struct { To string `json:"to"` Data string `json:"data"` } + w := wallet.Hex() + c := contract.Hex() + // "0x70a08231" - crypto.Keccak256Hash([]byte("balanceOf(address)")).String()[0:10] - data := "0x70a08231" + fmt.Sprintf("%064s", wallet[2:]) // %064s means that the string is padded with 0 to 64 bytes - req := request{contract, data} + data := "0x70a08231" + fmt.Sprintf("%064s", w[2:]) // %064s means that the string is padded with 0 to 32 bytes + req := request{c, data} var resp string n := hexutil.EncodeUint64(blockNumber) if err := a.rpcClient.Call(&resp, "eth_call", req, n); err != nil { - return nil, errors.WithMessage(err, "failed to make RPC call to arbitrum:") + return nil, 0, errors.WithMessage(err, "failed to make RPC call to arbitrum:") } + balance := stringToBigInt(resp) + + return balance, blockNumber, nil +} + +func stringToBigInt(str string) *big.Int { // remove leading zero of resp - t := strings.TrimLeft(resp[2:], "0") + t := strings.TrimLeft(str[2:], "0") if t == "" { t = "0" } s := "0x" + t - balance, err := hexutil.DecodeBig(s) + b, err := hexutil.DecodeBig(s) if err != nil { log.Fatal(err) } - - return balance, nil + return b } func (a *ArbitrumNovaAdapter) GetTransactionMessage(tx *types.Transaction) *core.Message { @@ -160,280 +166,379 @@ func (a *ArbitrumNovaAdapter) DecodeTransactionInputData(contractABI *abi.ABI, d return method.Name, inputsMap, nil } -func (a *ArbitrumNovaAdapter) GetLogsRecursively(fromBlock, toBlock int64, contracts []common.Address, level int) ([]any, error) { - - a.logger.Debugln("GET: ", strconv.Itoa(level), fromBlock, toBlock) - if level > 3 { - return nil, errors.New("GetLogsWrapper maximum recursion level") +func (a *ArbitrumNovaAdapter) GetRawLogs( + topic0 *common.Hash, + topic1 *common.Hash, + topic2 *common.Hash, + addresses []common.Address, + fromBlock *big.Int, + toBlock *big.Int, +) (replies []types.Log, err error) { + args := make(map[string]interface{}) + var topix []any + topix = append(topix, topic0) + topix = append(topix, topic1) + topix = append(topix, topic2) + + args["topics"] = topix + args["address"] = addresses + args["fromBlock"] = hexutil.EncodeBig(fromBlock) + args["toBlock"] = hexutil.EncodeBig(toBlock) + if err != nil { + return } + err = a.rpcClient.CallContext(context.TODO(), &replies, "eth_getLogs", args) + return +} - logs, err := a.GetLogs(fromBlock, toBlock, contracts) +func (a *ArbitrumNovaAdapter) GetEtherBalance(wallet *common.Address, block uint64) (*big.Int, error) { + resp := "" + err := a.rpcClient.Call(&resp, "eth_getBalance", wallet.Hex(), hexutil.EncodeUint64(block)) if err != nil { - a.logger.Error(err) - allLogs := make([]any, 0) - parts := int64(7) - if toBlock-fromBlock < parts { - return nil, errors.WithMessage(err, "can not split getLogs to parts") - } + return nil, err + } - step, _ := math.Modf(float64((toBlock - fromBlock) / parts)) - for i := 1; i <= int(parts); i++ { + balance := stringToBigInt(resp) + return balance, err +} - from := int64(fromBlock) + int64(i-1)*int64(step) - to := int64(fromBlock) + int64(i)*int64(step) - 1 - if int64(i) == parts { - to = toBlock - } +func (a *ArbitrumNovaAdapter) GetEtherLogs(fromBlock, toBlock uint64, wallets map[common.Address]bool) ([]harvester.ChangeEtherLog, error) { + workers := uint64(20) + chunkSize := math.Ceil(float64((toBlock - fromBlock + 1) / workers)) + if chunkSize == 0 { + chunkSize = 1 + } - l, err := a.GetLogsRecursively(from, to, contracts, level+1) - if err != nil { - return nil, errors.WithMessage(err, "recursive call error") - } - a.logger.Debugln(i, from, to) - allLogs = append(allLogs, l...) - } + blockNumbers := make([]uint64, 0) + for b := fromBlock; b <= toBlock; b++ { + blockNumbers = append(blockNumbers, b) + } - return allLogs, nil + chunks := chunkSlice(blockNumbers, uint64(chunkSize)) + + var g errgroup.Group + mu := sync.Mutex{} + logs := make([]harvester.ChangeEtherLog, 0) + + for _, chunk := range chunks { + func(chunk []uint64) { + g.Go(func() error { + from := chunk[0] + to := chunk[len(chunk)-1] + l, err := a.getEtherLogs(from, to, wallets) + mu.Lock() + logs = append(logs, l...) + mu.Unlock() + return err + }) + }(chunk) } - a.logger.Debugln("RETURN: ", fromBlock, toBlock, len(logs)) - return logs, err + if err := g.Wait(); err != nil { + return nil, err + } + return logs, nil } -func (a *ArbitrumNovaAdapter) GetLogs(fromBlock, toBlock int64, contracts []common.Address) ([]any, error) { +func (a *ArbitrumNovaAdapter) getEtherLogs(fromBlock, toBlock uint64, wallets map[common.Address]bool) ([]harvester.ChangeEtherLog, error) { + logs := make([]harvester.ChangeEtherLog, 0) + res := make(map[string]any) - if contracts == nil { - contracts = a.contracts.AllAddresses + type respTx struct { + Hash string `json:"hash"` + From string `json:"from"` + To *string `json:"to"` + Value string `json:"value"` + Gas string `json:"gas"` } - query := ethereum.FilterQuery{ - FromBlock: big.NewInt(fromBlock), - ToBlock: big.NewInt(toBlock), - Addresses: contracts, + mu := sync.Mutex{} + + for b := fromBlock; b <= toBlock; b++ { + blockNumber := hexutil.EncodeUint64(b) + a.logger.Debugln("eth_getBlockByNumber", fromBlock, toBlock) + err := a.rpcClient.Call(&res, "eth_getBlockByNumber", blockNumber, true) + if err != nil { + return nil, err + } + txs, ok := res["transactions"] + if !ok { + continue + } + txsMap, ok := txs.([]any) + for _, txMap := range txsMap { + jsonData, _ := json.Marshal(txMap) + tx := respTx{} + err = json.Unmarshal(jsonData, &tx) + if err != nil { + return nil, err + } + + to := common.Address{} + if tx.To != nil { + to = common.HexToAddress(*tx.To) + } + from := common.HexToAddress(tx.From) + + _, hasFrom := wallets[from] + _, hasTo := wallets[to] + + if !hasFrom && !hasTo { + continue + } + + if tx.Gas != "0x0" && hasFrom { + func() { + receipt, err := a.eth_getTransactionReceipt(tx.Hash) + if err != nil { + a.logger.Error(err) + return + } + //if receipt.Status != "0x1" { + // return + //} + + gasUsed := big.NewInt(int64(hex2int(receipt.GasUsed))) + gasPrice := big.NewInt(int64(hex2int(receipt.EffectiveGasPrice))) + delta := gasUsed.Mul(gasUsed, gasPrice) + delta = delta.Neg(delta) + + mu.Lock() + logs = append(logs, harvester.ChangeEtherLog{ + Block: b, + Wallet: common.HexToAddress(receipt.From), + Delta: delta, + }) + mu.Unlock() + }() + } + + if tx.Value == "0x0" { + // Tx fee already counted in previous section + continue + } + + if hasTo { + logs = append(logs, harvester.ChangeEtherLog{ + Block: b, + Wallet: to, + Delta: big.NewInt(int64(hex2int(tx.Value))), + }) + } + if hasFrom { + delta := big.NewInt(int64(hex2int(tx.Value))) + logs = append(logs, harvester.ChangeEtherLog{ + Block: b, + Wallet: common.HexToAddress(tx.From), + Delta: delta.Neg(delta), + }) + } + } } - bcLogs, err := a.FilterLogs(context.TODO(), query) + return logs, nil +} + +type TransactionReceipt struct { + From string `json:"from"` + Status string `json:"status"` + GasUsed string `json:"gasUsed"` + EffectiveGasPrice string `json:"effectiveGasPrice"` +} + +func (a *ArbitrumNovaAdapter) eth_getTransactionReceipt(hash string) (*TransactionReceipt, error) { + res := TransactionReceipt{} + err := a.rpcClient.Call(&res, "eth_getTransactionReceipt", hash) if err != nil { - return nil, errors.WithMessage(err, "failed to filter log") + return nil, err } + return &res, nil +} - logs := make([]any, 0) - +func (a *ArbitrumNovaAdapter) GetNFTLogs(fromBlock, toBlock uint64, contracts []common.Address) ([]any, error) { logTransferSig := []byte("Transfer(address,address,uint256)") logTransferSigHash := crypto.Keccak256Hash(logTransferSig) + bcLogs, err := a.GetRawLogs(&logTransferSigHash, nil, nil, contracts, big.NewInt(int64(fromBlock)), big.NewInt(int64(toBlock))) + if err != nil { + return nil, errors.WithMessage(err, "failed to filter log") + } - logStakeSigHash := crypto.Keccak256Hash([]byte(a.contracts.StakeABI.Events["Stake"].Sig)) - logUnstakeSigHash := a.contracts.StakeABI.Events["Unstake"].ID - logRestakeSigHash := a.contracts.StakeABI.Events["Restake"].ID - - logTransferNftHash := a.contracts.NftABI.Events["Transfer"].ID + logs := make([]any, 0) for _, vLog := range bcLogs { + + if len(vLog.Topics) != 4 { + a.logger.Error("Transfer NFT log must have 4 topic items") + continue + } //fmt.Printf("Log Block Number: %d\n", vLog.BlockNumber) //fmt.Printf("Log Index: %d\n", vLog.Index) - // Iterate contracts - switch vLog.Address.Hex() { - case a.contracts.momTokenAddress.Hex(), a.contracts.dadTokenAddress.Hex(): - switch vLog.Topics[0].Hex() { - case logTransferSigHash.Hex(): - //fmt.Printf("Log Name: Transfer\n") + var e harvester.TransferNFTLog - //var transferEvent harvester.BCDiff + e.Block = vLog.BlockNumber + e.Contract = vLog.Address + // Hex and Un Hex here used to remove padding zeros + e.From = common.HexToAddress(vLog.Topics[1].Hex()) + e.To = common.HexToAddress(vLog.Topics[2].Hex()) + e.TokenID = vLog.Topics[3] - var e harvester.TransferERC20Log + logs = append(logs, &e) + } - ev, err := a.contracts.TokenABI.Unpack("Transfer", vLog.Data) - if err != nil { - return nil, errors.WithMessage(err, "failed to unpack event from ABI") - } + return logs, nil +} - e.Contract = strings.ToLower(vLog.Address.Hex()) - // Hex and Un Hex here used to remove padding zeros - e.From = strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) - e.To = strings.ToLower(common.HexToAddress(vLog.Topics[2].Hex()).Hex()) - if len(ev) > 0 { - e.Value = ev[0].(*big.Int) - } +func (a *ArbitrumNovaAdapter) GetTokenLogs(fromBlock, toBlock uint64, contracts []common.Address) ([]any, error) { + logTransferSig := []byte("Transfer(address,address,uint256)") + logTransferSigHash := crypto.Keccak256Hash(logTransferSig) + bcLogs, err := a.GetRawLogs(&logTransferSigHash, nil, nil, contracts, big.NewInt(int64(fromBlock)), big.NewInt(int64(toBlock))) + if err != nil { + return nil, errors.WithMessage(err, "failed to filter log") + } - logs = append(logs, &e) - } + logs := make([]any, 0) - case a.contracts.stakeAddress.Hex(): - switch vLog.Topics[0].Hex() { + for _, vLog := range bcLogs { - case logStakeSigHash.Hex(): - ev, err := a.contracts.StakeABI.Unpack("Stake", vLog.Data) - if err != nil { - return nil, errors.WithMessage(err, "failed to unpack event from ABI") - } + if len(vLog.Topics) == 4 { + a.logger.Error("Got Transfer NFT log from blockchain in Token contract handler") + continue + } + //fmt.Printf("Log Block Number: %d\n", vLog.BlockNumber) + //fmt.Printf("Log Index: %d\n", vLog.Index) - // Hack to remove extra zeroes - fromWallet := common.HexToAddress(vLog.Topics[1].Hex()) + var e harvester.TransferERC20Log - b := vLog.Topics[2].Bytes()[16:] - odysseyID, err := umid.FromBytes(b) - if err != nil { - return nil, errors.WithMessage(err, "failed to parse umid from bytes") - } - if odysseyID == umid.MustParse("ccccaaaa-1111-2222-3333-222222222222") || - odysseyID == umid.MustParse("ccccaaaa-1111-2222-3333-222222222244") || - odysseyID == umid.MustParse("ccccaaaa-1111-2222-3333-222222222241") { - // Skip test Odyssey IDs - continue - } + e.Block = vLog.BlockNumber + e.Contract = vLog.Address + // Hex and Un Hex here used to remove padding zeros + e.From = common.HexToAddress(vLog.Topics[1].Hex()) + e.To = common.HexToAddress(vLog.Topics[2].Hex()) - transactionHash := vLog.TxHash.Hex() - amount := ev[0].(*big.Int) - tokenType := ev[1].(uint8) - totalAmount := ev[2].(*big.Int) - - e := &harvester.StakeLog{ - TxHash: transactionHash, - LogIndex: vLog.Index, - UserWallet: fromWallet.Hex(), - OdysseyID: odysseyID, - AmountStaked: amount, - TokenType: tokenType, - TotalStaked: totalAmount, - } + data := common.TrimLeftZeroes(vLog.Data) + hex := common.Bytes2Hex(data) + hex = TrimLeftZeroes(hex) + if hex == "" { + a.logger.Error("Got Transfer Token log with empty data") + continue + } + erc20Amount, err := hexutil.DecodeBig("0x" + hex) + if err != nil { + a.logger.Error(err) + } + e.Value = erc20Amount - logs = append(logs, e) + logs = append(logs, &e) + } - case logUnstakeSigHash.Hex(): - ev, err := a.contracts.StakeABI.Unpack("Unstake", vLog.Data) - if err != nil { - return nil, errors.WithMessage(err, "failed to unpack event from ABI") - } + return logs, nil +} - // Hack to remove extra zeroes - fromWallet := common.HexToAddress(vLog.Topics[1].Hex()) +func TrimLeftZeroes(hex string) string { + idx := 0 + for ; idx < len(hex); idx++ { + if hex[idx] != '0' { + break + } + } + return hex[idx:] +} - b1 := vLog.Topics[2].Bytes() - b2 := b1[16:] - odysseyID, err := umid.FromBytes(b2) - if err != nil { - return nil, errors.WithMessage(err, "failed to parse umid from bytes") - } +func (a *ArbitrumNovaAdapter) GetInfo() (umid umid.UMID, name string, rpcURL string) { + return a.umid, a.name, a.httpURL +} - transactionHash := vLog.TxHash.Hex() - amount := ev[0].(*big.Int) - tokenType := ev[1].(uint8) - totalAmount := ev[2].(*big.Int) - - e := &harvester.UnstakeLog{ - TxHash: transactionHash, - LogIndex: vLog.Index, - UserWallet: fromWallet.Hex(), - OdysseyID: odysseyID, - AmountUnstaked: amount, - TokenType: tokenType, - TotalStaked: totalAmount, - } +func hex2int(hexStr string) uint64 { + // remove 0x suffix if found in the input string + cleaned := strings.Replace(hexStr, "0x", "", -1) + + // base 16 for hexadecimal + result, _ := strconv.ParseUint(cleaned, 16, 64) + return result +} - logs = append(logs, e) +func (a *ArbitrumNovaAdapter) GetNFTBalance(nftContract *common.Address, wallet *common.Address, block uint64) ([]common.Hash, error) { + transferString := "Transfer(address,address,uint256)" + transferTopic := common.BytesToHash(crypto.Keccak256([]byte(transferString))) - case logRestakeSigHash.Hex(): - a.logger.Debugln("Restake") - } - case a.contracts.nftAddress.Hex(): - a.logger.Debugln("NFT") + if nftContract == nil { + return nil, errors.New("Failed to GetNFTBalance: NFT contract can not be nil") + } - switch vLog.Topics[0].Hex() { - case logTransferNftHash.Hex(): - // TODO Not sure why vLog.Data is empty - //ev, err := a.contracts.NftABI.Unpack("Transfer", vLog.Data) - //if err != nil { - // return nil, errors.WithMessage(err, "failed to unpack event from ABI") - //} + contracts := []common.Address{ + *nftContract, + } - from := strings.ToLower(common.HexToAddress(vLog.Topics[1].Hex()).Hex()) - to := strings.ToLower(common.HexToAddress(vLog.Topics[2].Hex()).Hex()) - itemID := vLog.Topics[3].Big() + logsFrom, err := a.GetRawLogs(&transferTopic, addrToHash(wallet), nil, contracts, big.NewInt(0), big.NewInt(int64(block))) + if err != nil { + return nil, errors.WithMessage(err, "failed to get logs for nft contract") + } - var id umid.UMID - itemID.FillBytes(id[:]) + logsTo, err := a.GetRawLogs(&transferTopic, nil, addrToHash(wallet), contracts, big.NewInt(0), big.NewInt(int64(block))) + if err != nil { + return nil, errors.WithMessage(err, "failed to get logs for nft contract") + } - if err != nil { - return nil, errors.WithMessage(err, "failed to read umid from bytes") - } + m := make(map[common.Hash]int8) - e := &harvester.TransferNFTLog{ - From: from, - To: to, - TokenID: id, - Contract: strings.ToLower(vLog.Address.Hex()), - } + for _, l := range logsFrom { + id := l.Topics[3] - logs = append(logs, e) - } + _, ok := m[id] + if !ok { + m[id] = 0 } + m[id] -= 1 } - return logs, nil -} + for _, l := range logsTo { + id := l.Topics[3] -func (a *ArbitrumNovaAdapter) GetInfo() (umid umid.UMID, name string, rpcURL string) { - return a.umid, a.name, a.wsURL -} + _, ok := m[id] + if !ok { + m[id] = 0 + } -func (a *ArbitrumNovaAdapter) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - var result []types.Log - arg, err := toFilterArg(q) - if err != nil { - return nil, err + m[id] += 1 } - err = a.rpcClient.CallContext(ctx, &result, "eth_getLogs", arg) - return result, err -} -func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { - arg := map[string]interface{}{ - "address": q.Addresses, - "topics": q.Topics, - } - if q.BlockHash != nil { - arg["blockHash"] = *q.BlockHash - if q.FromBlock != nil || q.ToBlock != nil { - return nil, fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock") + ids := make([]common.Hash, 0) + for id, i := range m { + if i != 0 && i != 1 { + a.logger.Error("Failed to parse NFT transfers, Something wrong in blockchain history") } - } else { - if q.FromBlock == nil { - arg["fromBlock"] = "0x0" - } else { - arg["fromBlock"] = toBlockNumArg(q.FromBlock) + if i == 1 { + ids = append(ids, id) } - arg["toBlock"] = toBlockNumArg(q.ToBlock) } - return arg, nil + + return ids, nil } -func toBlockNumArg(number *big.Int) string { - if number == nil { - return "latest" +func addrToHash(addr *common.Address) *common.Hash { + if addr == nil { + return nil } - pending := big.NewInt(-1) - if number.Cmp(pending) == 0 { - return "pending" - } - finalized := big.NewInt(int64(rpc.FinalizedBlockNumber)) - if number.Cmp(finalized) == 0 { - return "finalized" - } - safe := big.NewInt(int64(rpc.SafeBlockNumber)) - if number.Cmp(safe) == 0 { - return "safe" - } - return hexutil.EncodeBig(number) + res := common.HexToHash(addr.Hex()) + return &res } -func hex2int(hexStr string) uint64 { - // remove 0x suffix if found in the input string - cleaned := strings.Replace(hexStr, "0x", "", -1) +func chunkSlice(slice []uint64, chunkSize uint64) [][]uint64 { + var chunks [][]uint64 + for i := uint64(0); i < uint64(len(slice)); i += chunkSize { + end := i + chunkSize - // base 16 for hexadecimal - result, _ := strconv.ParseUint(cleaned, 16, 64) - return uint64(result) + // necessary check to avoid slicing beyond + // slice capacity + if end > uint64(len(slice)) { + end = uint64(len(slice)) + } + + chunks = append(chunks, slice[i:end]) + } + + return chunks } diff --git a/harvester/callbacks.go b/harvester/callbacks.go deleted file mode 100644 index 6dfbadb1..00000000 --- a/harvester/callbacks.go +++ /dev/null @@ -1,53 +0,0 @@ -package harvester - -import ( - "sync" -) - -type Callback *func(p any) - -type Callbacks struct { - Mu sync.RWMutex - Data map[string]map[Event]map[Callback]bool -} - -func NewCallbacks() *Callbacks { - return &Callbacks{ - Mu: sync.RWMutex{}, - Data: make(map[string]map[Event]map[Callback]bool), - } -} - -func (c *Callbacks) Add(bcType string, event Event, f Callback) { - c.Mu.Lock() - defer c.Mu.Unlock() - - if c.Data[bcType] == nil { - c.Data[bcType] = map[Event]map[Callback]bool{} - } - if c.Data[bcType][event] == nil { - c.Data[bcType][event] = map[Callback]bool{} - } - - c.Data[bcType][event][f] = true -} - -func (c *Callbacks) Remove(bcType string, event Event, f Callback) { - c.Mu.Lock() - defer c.Mu.Unlock() - - if _, ok := c.Data[bcType][event][f]; ok == true { - delete(c.Data[bcType][event], f) - } -} - -func (c *Callbacks) Trigger(bcType string, event Event, p any) { - if _, ok := c.Data[bcType][event]; ok == false { - return - } - - for pf := range c.Data[bcType][event] { - f := *pf - go f(p) - } -} diff --git a/harvester3/cmd/adapter-getBalance/main.go b/harvester/cmd/adapter-getBalance/main.go similarity index 85% rename from harvester3/cmd/adapter-getBalance/main.go rename to harvester/cmd/adapter-getBalance/main.go index 8f43fa48..d02e0ad0 100644 --- a/harvester3/cmd/adapter-getBalance/main.go +++ b/harvester/cmd/adapter-getBalance/main.go @@ -7,8 +7,8 @@ import ( "github.com/ethereum/go-ethereum/common" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -18,7 +18,7 @@ func main() { logger := helper.GetZapLogger() sugaredLogger := logger.Sugar() - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() diff --git a/harvester3/cmd/adapter-getEtherBalance/main.go b/harvester/cmd/adapter-getEtherBalance/main.go similarity index 88% rename from harvester3/cmd/adapter-getEtherBalance/main.go rename to harvester/cmd/adapter-getEtherBalance/main.go index 13155d06..e7fe7ef5 100644 --- a/harvester3/cmd/adapter-getEtherBalance/main.go +++ b/harvester/cmd/adapter-getEtherBalance/main.go @@ -6,8 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -48,7 +48,7 @@ func main() { wKovi = common.HexToAddress("0xc6220f7F21e15B8886eD38A98496E125b564c414") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() n, err := a.GetLastBlockNumber() diff --git a/harvester3/cmd/adapter-getEtherLogs/main.go b/harvester/cmd/adapter-getEtherLogs/main.go similarity index 90% rename from harvester3/cmd/adapter-getEtherLogs/main.go rename to harvester/cmd/adapter-getEtherLogs/main.go index ea631ee5..641d2eb2 100644 --- a/harvester3/cmd/adapter-getEtherLogs/main.go +++ b/harvester/cmd/adapter-getEtherLogs/main.go @@ -7,8 +7,8 @@ import ( "github.com/ethereum/go-ethereum/common" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -50,7 +50,7 @@ func main() { wKovi = common.HexToAddress("0xc6220f7F21e15B8886eD38A98496E125b564c414") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() n, err := a.GetLastBlockNumber() diff --git a/harvester3/cmd/adapter-getNFTBalance/main.go b/harvester/cmd/adapter-getNFTBalance/main.go similarity index 90% rename from harvester3/cmd/adapter-getNFTBalance/main.go rename to harvester/cmd/adapter-getNFTBalance/main.go index faf57f53..3cffc849 100644 --- a/harvester3/cmd/adapter-getNFTBalance/main.go +++ b/harvester/cmd/adapter-getNFTBalance/main.go @@ -6,8 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -48,7 +48,7 @@ func main() { wKovi = common.HexToAddress("0xc6220f7F21e15B8886eD38A98496E125b564c414") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() n, err := a.GetLastBlockNumber() diff --git a/harvester3/cmd/adapter/main.go b/harvester/cmd/adapter/main.go similarity index 82% rename from harvester3/cmd/adapter/main.go rename to harvester/cmd/adapter/main.go index 8bba72c8..b77a1a9f 100644 --- a/harvester3/cmd/adapter/main.go +++ b/harvester/cmd/adapter/main.go @@ -7,9 +7,9 @@ import ( "github.com/ethereum/go-ethereum/common" - "github.com/momentum-xyz/ubercontroller/harvester3" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -18,7 +18,7 @@ func main() { logger := helper.GetZapLogger() sugaredLogger := logger.Sugar() - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() n, err := a.GetLastBlockNumber() @@ -28,7 +28,7 @@ func main() { fmt.Printf("Last Block: %+v \n", n) - var l harvester3.AdapterListener + var l harvester.AdapterListener l = func(blockNumber uint64) { } @@ -46,8 +46,8 @@ func main() { for _, log := range logs { switch log.(type) { - case *harvester3.TransferERC20Log: - l := log.(*harvester3.TransferERC20Log) + case *harvester.TransferERC20Log: + l := log.(*harvester.TransferERC20Log) fmt.Printf("% s %s %s %s \n", l.Contract, l.From, l.To, l.Value) //fmt.Println(log.(*harvester.TransferERC20Log).Value) } diff --git a/harvester3/cmd/ethers/main.go b/harvester/cmd/ethers/main.go similarity index 83% rename from harvester3/cmd/ethers/main.go rename to harvester/cmd/ethers/main.go index dc1adc05..7e87b05f 100644 --- a/harvester3/cmd/ethers/main.go +++ b/harvester/cmd/ethers/main.go @@ -9,9 +9,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v4/pgxpool" - "github.com/momentum-xyz/ubercontroller/harvester3" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -62,13 +62,13 @@ func main() { wKovi = common.HexToAddress("0xc6220f7F21e15B8886eD38A98496E125b564c414") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() - output := make(chan harvester3.UpdateCell) + output := make(chan harvester.UpdateCell) go worker(output) - matrix := harvester3.NewNFTs(pool, a, sugaredLogger, output) + matrix := harvester.NewNFTs(pool, a, sugaredLogger, output) err = matrix.Run() if err != nil { log.Fatal(err) @@ -91,7 +91,7 @@ func main() { time.Sleep(time.Second * 30) } -func worker(c <-chan harvester3.UpdateCell) { +func worker(c <-chan harvester.UpdateCell) { for { u := <-c fmt.Println("worker", u.Contract, u.Wallet, u.IDs) diff --git a/harvester3/cmd/harvester/main.go b/harvester/cmd/harvester/main.go similarity index 81% rename from harvester3/cmd/harvester/main.go rename to harvester/cmd/harvester/main.go index 240d3843..db79dd5e 100644 --- a/harvester3/cmd/harvester/main.go +++ b/harvester/cmd/harvester/main.go @@ -9,9 +9,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v4/pgxpool" - "github.com/momentum-xyz/ubercontroller/harvester3" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -55,13 +55,13 @@ func main() { w2 = common.HexToAddress("0x695c0AbC571F5F434351dAB51b92b851562a8BE1") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() - harvesterOutput := make(chan harvester3.UpdateCell) + harvesterOutput := make(chan harvester.UpdateCell) go worker(harvesterOutput) - harvester := harvester3.NewHarvester(harvesterOutput, pool, a, sugaredLogger) + harvester := harvester.NewHarvester(harvesterOutput, pool, a, sugaredLogger) err = harvester.Run() if err != nil { log.Fatal(err) @@ -91,7 +91,7 @@ func main() { time.Sleep(time.Second * 300) } -func worker(output chan harvester3.UpdateCell) { +func worker(output chan harvester.UpdateCell) { for { fmt.Println(<-output) } diff --git a/harvester3/cmd/helper.go b/harvester/cmd/helper.go similarity index 100% rename from harvester3/cmd/helper.go rename to harvester/cmd/helper.go diff --git a/harvester3/cmd/nfts/main.go b/harvester/cmd/nfts/main.go similarity index 83% rename from harvester3/cmd/nfts/main.go rename to harvester/cmd/nfts/main.go index a637eb02..614f7dc2 100644 --- a/harvester3/cmd/nfts/main.go +++ b/harvester/cmd/nfts/main.go @@ -9,9 +9,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v4/pgxpool" - "github.com/momentum-xyz/ubercontroller/harvester3" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -62,13 +62,13 @@ func main() { wKovi = common.HexToAddress("0xc6220f7F21e15B8886eD38A98496E125b564c414") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() - output := make(chan harvester3.UpdateCell) + output := make(chan harvester.UpdateCell) go worker(output) - matrix := harvester3.NewEthers(pool, a, sugaredLogger, output) + matrix := harvester.NewEthers(pool, a, sugaredLogger, output) err = matrix.Run() if err != nil { log.Fatal(err) @@ -92,7 +92,7 @@ func main() { time.Sleep(time.Second * 30) } -func worker(c <-chan harvester3.UpdateCell) { +func worker(c <-chan harvester.UpdateCell) { for { u := <-c fmt.Println("worker", u.Contract, u.Wallet, u.IDs) diff --git a/harvester3/cmd/tokens/main.go b/harvester/cmd/tokens/main.go similarity index 79% rename from harvester3/cmd/tokens/main.go rename to harvester/cmd/tokens/main.go index 00cd8991..8d69d5ef 100644 --- a/harvester3/cmd/tokens/main.go +++ b/harvester/cmd/tokens/main.go @@ -9,9 +9,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v4/pgxpool" - "github.com/momentum-xyz/ubercontroller/harvester3" - "github.com/momentum-xyz/ubercontroller/harvester3/arbitrum_nova_adapter3" - helper "github.com/momentum-xyz/ubercontroller/harvester3/cmd" + "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" + helper "github.com/momentum-xyz/ubercontroller/harvester/cmd" ) func main() { @@ -49,13 +49,13 @@ func main() { w1 = common.HexToAddress("0x683642c22feDE752415D4793832Ab75EFdF6223c") } - a := arbitrum_nova_adapter3.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) + a := arbitrum_nova_adapter.NewArbitrumNovaAdapter(&cfg.Arbitrum3, sugaredLogger) a.Run() - output := make(chan harvester3.UpdateCell) + output := make(chan harvester.UpdateCell) go worker(output) - matrix := harvester3.NewTokens(pool, a, sugaredLogger, output) + matrix := harvester.NewTokens(pool, a, sugaredLogger, output) err = matrix.Run() if err != nil { log.Fatal(err) @@ -78,7 +78,7 @@ func main() { time.Sleep(time.Second * 30) } -func worker(c <-chan harvester3.UpdateCell) { +func worker(c <-chan harvester.UpdateCell) { for { u := <-c fmt.Println("worker", u.Contract, u.Wallet, u.Block, u.Value) diff --git a/harvester/harvester.go b/harvester/harvester.go deleted file mode 100644 index 939afd9d..00000000 --- a/harvester/harvester.go +++ /dev/null @@ -1,86 +0,0 @@ -package harvester - -import ( - "fmt" - "sync" - - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" -) - -type Harvester struct { - clients *Callbacks - db *pgxpool.Pool - bc map[string]*Table - mu sync.Mutex -} - -func (h *Harvester) SubscribeForWallet(bcType string, wallet, callback Callback) { - //TODO implement me - panic("implement me") -} - -func (h *Harvester) SubscribeForWalletAndContract(bcType string, wallet string, contract string, callback Callback) error { - mu.Lock() - defer mu.Unlock() - table, ok := h.bc[bcType] - if !ok { - return errors.New("failed to find blockchain:" + bcType) - } - - h.clients.Add(bcType, BalanceChange, callback) - table.AddWalletContract(wallet, contract) - - return nil -} - -type Address []byte - -func NewHarvester(db *pgxpool.Pool) *Harvester { - return &Harvester{ - clients: NewCallbacks(), - db: db, - bc: make(map[string]*Table), - } -} - -func (h *Harvester) Init() { - -} - -func (h *Harvester) OnBalanceChange() { - -} - -func (h *Harvester) OnNewBlock(bcType string, block *BCBlock) { - //fmt.Printf("On new block: %+v %+v \n", block.Hash, block.Number) - h.clients.Trigger(bcType, NewBlock, block) -} - -func (h *Harvester) RegisterAdapter(adapter Adapter) error { - _, bcType, _ := adapter.GetInfo() - - h.bc[bcType] = NewTable(h.db, adapter, h.updateHook) - h.bc[bcType].Run() - - return nil -} - -func (h *Harvester) updateHook(bcType string, updates []*UpdateEvent, stakeUpdates []*StakeEvent) { - for _, update := range updates { - fmt.Println("Trigger Balance") - h.clients.Trigger(bcType, BalanceChange, update) - } -} - -func (h *Harvester) Run() error { - return nil -} - -func (h *Harvester) Subscribe(bcType string, eventName Event, callback Callback) { - h.clients.Add(bcType, eventName, callback) -} - -func (h *Harvester) Unsubscribe(bcType string, eventName Event, callback Callback) { - h.clients.Remove(bcType, eventName, callback) -} diff --git a/harvester/interfaces.go b/harvester/interfaces.go index 5bd5cb65..b0189a0e 100644 --- a/harvester/interfaces.go +++ b/harvester/interfaces.go @@ -8,161 +8,39 @@ import ( "github.com/momentum-xyz/ubercontroller/utils/umid" ) +type AdapterListener func(blockNumber uint64) + type TransferERC20Log struct { - From string - To string + Block uint64 + From common.Address + To common.Address Value *big.Int - Contract string -} - -/** - * - * @param user User address - * @param odyssey Odyssey ID that's being staked - * @param amount_staked Amount being staked - * @param token Token used (MOM or DAD) - * @param total_staked Total being staked by the user on the Odyssey - */ -//event Stake(address user, bytes16 odyssey, uint256 amount_staked, Token token, uint256 total_staked); -type StakeLog struct { - LogIndex uint - TxHash string - UserWallet string - OdysseyID umid.UMID - AmountStaked *big.Int - TokenType uint8 - TotalStaked *big.Int -} - -/** - * - * @param user User address - * @param odyssey Odyssey ID that's being unstaked - * @param amount_unstaked Amount unstaked - * @param token Token used (MOM or DAD) - * @param total_staked Total remained staked by the user on that Odyssey - */ -//event Unstake(address user, bytes16 odyssey, uint256 amount_unstaked, Token token, uint256 total_staked); -type UnstakeLog struct { - LogIndex uint - TxHash string - UserWallet string - OdysseyID umid.UMID - AmountUnstaked *big.Int - TokenType uint8 - TotalStaked *big.Int -} - -/** - * - * @param user User address - * @param odyssey_from Odyssey ID that the user is removing stake - * @param odyssey_to Odyssey ID that the user is staking into - * @param amount Amount that's being restaked - * @param token Token used (MOM or DAD) - * @param total_staked_from Total amount of tokens that remains staked on the `odyssey_from` - * @param total_staked_to Total amount of tokens staked on `odyssey_to` - */ -//event Restake(address user, -//bytes16 odyssey_from, -//bytes16 odyssey_to, -//uint256 amount, -//Token token, -//uint256 total_staked_from, -//uint256 total_staked_to); -type RestakeLog struct { - UserWallet string - FromOdysseyID umid.UMID - ToOdysseyID umid.UMID - Amount *big.Int - TokenType uint8 - TotalStakedToFrom *big.Int - TotalStakedToTo *big.Int + Contract common.Address } -/** - * @dev Emitted when `tokenId` token is transferred from `from` to `to`. - */ -//event Transfer(address indexed from, address indexed to, uint256 indexed tokenId); type TransferNFTLog struct { - From string - To string - TokenID umid.UMID - Contract string -} - -type BCBlock struct { - Hash string - Number uint64 -} - -type BCDiff struct { - From string - To string - Token string - Amount *big.Int -} - -type BCStake struct { - From string - OdysseyID umid.UMID - TokenType uint8 //0-MOM; 1-DAD - Amount *big.Int - TotalAmount *big.Int -} - -type UpdateEvent struct { - Wallet string - Contract string - Amount *big.Int + Block uint64 + From common.Address + To common.Address + TokenID common.Hash + Contract common.Address } -type StakeEvent struct { - TxHash string - LogIndex string - Wallet string - Kind uint8 - OdysseyID umid.UMID - Amount *big.Int - ActivityType string - //CreatedAt time.Time +type ChangeEtherLog struct { + Block uint64 + Wallet common.Address + Delta *big.Int } -type NftEvent struct { - From string - To string - OdysseyID umid.UMID - Contract string -} - -type AdapterListener func(blockNumber uint64, diffs []*BCDiff, stakes []*BCStake) - type Adapter interface { GetLastBlockNumber() (uint64, error) - GetBalance(wallet string, contract string, blockNumber uint64) (*big.Int, error) - GetLogs(fromBlock, toBlock int64, addresses []common.Address) ([]any, error) - GetLogsRecursively(fromBlock, toBlock int64, addresses []common.Address, level int) ([]any, error) + GetTokenBalance(contract *common.Address, wallet *common.Address, blockNumber uint64) (*big.Int, uint64, error) + GetNFTBalance(nftContract *common.Address, wallet *common.Address, block uint64) ([]common.Hash, error) + GetEtherBalance(wallet *common.Address, block uint64) (*big.Int, error) + GetTokenLogs(fromBlock, toBlock uint64, addresses []common.Address) ([]any, error) + GetNFTLogs(fromBlock, toBlock uint64, contracts []common.Address) ([]any, error) + GetEtherLogs(fromBlock, toBlock uint64, wallets map[common.Address]bool) ([]ChangeEtherLog, error) RegisterNewBlockListener(f AdapterListener) Run() GetInfo() (umid umid.UMID, name string, rpcURL string) } - -type BCType string - -const Ethereum string = "ethereum" -const Polkadot string = "polkadot" -const ArbitrumNova string = "arbitrum_nova" - -type Event string - -const NewBlock Event = "new_block" -const BalanceChange Event = "balance_change" - -type IHarvester interface { - RegisterAdapter(bcAdapter Adapter) error - OnBalanceChange() - Subscribe(bcType string, eventName Event, callback Callback) - Unsubscribe(bcType string, eventName Event, callback Callback) - SubscribeForWallet(bcType string, wallet, callback Callback) - SubscribeForWalletAndContract(bcType string, wallet string, contract string, callback Callback) error -} diff --git a/harvester/singleton.go b/harvester/singleton.go deleted file mode 100644 index fdb691bc..00000000 --- a/harvester/singleton.go +++ /dev/null @@ -1,34 +0,0 @@ -package harvester - -import ( - "context" - "sync" - - "github.com/jackc/pgx/v4/pgxpool" - "go.uber.org/zap" - - "github.com/momentum-xyz/ubercontroller/config" -) - -var instance *Harvester -var mu sync.Mutex -var logger *zap.SugaredLogger - -func Initialise(ctx context.Context, log *zap.SugaredLogger, cfg *config.Config, pool *pgxpool.Pool) { - mu.Lock() - defer mu.Unlock() - - logger = log - - if instance == nil { - instance = NewHarvester(pool) - } -} - -func GetInstance() *Harvester { - if instance == nil { - logger.Error("Harvester must be initialised") - } - - return instance -} diff --git a/harvester/start.go b/harvester/start.go deleted file mode 100644 index 3a2fa46d..00000000 --- a/harvester/start.go +++ /dev/null @@ -1,63 +0,0 @@ -package harvester - -import ( - "context" - "fmt" - - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" - - "github.com/momentum-xyz/ubercontroller/config" -) - -func SubscribeAllWallets(ctx context.Context, harv *Harvester, cfg *config.Config, pool *pgxpool.Pool) error { - sql := `SELECT value -> 'wallet' AS wallets - FROM user_attribute - WHERE true - AND plugin_id = '86DC3AE7-9F3D-42CB-85A3-A71ABC3C3CB8' - AND attribute_name = 'wallet'` - - rows, err := pool.Query(ctx, sql) - if err != nil { - return err - } - - allWallets := make([]string, 0) - - for rows.Next() { - var wallets []string - - if err := rows.Scan(&wallets); err != nil { - fmt.Println(allWallets) - return errors.WithMessage(err, "failed to scan rows from user_attribute table") - } - - allWallets = append(allWallets, wallets...) - } - - handlerVar := handler - handlerPointer := &handlerVar - - for _, w := range allWallets { - if len(w) != 42 { - // Skip Polkadot address in Hex format - continue - } - - if w[0:2] != "0x" { - // Skip Polkadot addresses - continue - } - - err := harv.SubscribeForWalletAndContract(ArbitrumNova, w, cfg.Arbitrum.MOMTokenAddress, handlerPointer) - if err != nil { - return errors.WithMessage(err, "failed to subscribe wallet/contract") - } - } - - return nil -} - -func handler(p any) { - -} diff --git a/harvester/table.go b/harvester/table.go deleted file mode 100644 index f195e260..00000000 --- a/harvester/table.go +++ /dev/null @@ -1,449 +0,0 @@ -package harvester - -import ( - "context" - "encoding/hex" - "fmt" - "math/big" - "strings" - - "github.com/ethereum/go-ethereum/common" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" - "github.com/sasha-s/go-deadlock" - - "github.com/momentum-xyz/ubercontroller/types/entry" - "github.com/momentum-xyz/ubercontroller/utils/umid" -) - -type Table struct { - mu deadlock.RWMutex - blockNumber uint64 - data map[string]map[string]*big.Int - stakesData map[umid.UMID]map[string]*big.Int - db *pgxpool.Pool - adapter Adapter - harvesterListener func(bcName string, p []*UpdateEvent, s []*StakeEvent) -} - -func NewTable(db *pgxpool.Pool, adapter Adapter, listener func(bcName string, p []*UpdateEvent, s []*StakeEvent)) *Table { - return &Table{ - blockNumber: 0, - data: make(map[string]map[string]*big.Int), - stakesData: make(map[umid.UMID]map[string]*big.Int), - adapter: adapter, - harvesterListener: listener, - db: db, - } -} - -func (t *Table) Run() { - err := t.LoadFromDB() - if err != nil { - fmt.Println(err) - } - - t.fastForward() - - t.adapter.RegisterNewBlockListener(t.listener) -} - -func (t *Table) fastForward() { - t.mu.Lock() - defer t.mu.Unlock() - - lastBlockNumber, err := t.adapter.GetLastBlockNumber() - fmt.Printf("Fast Forward. From: %d to: %d\n", t.blockNumber, lastBlockNumber) - if err != nil { - fmt.Println(err) - return - } - - if t.blockNumber >= lastBlockNumber { - // Table already processed latest BC block - return - } - - contracts := make([]common.Address, 0) - - //if t.blockNumber == 0 { - // // No blocks processed - // // Initialisation should be done using GetBalance for tokens - // // But for stakes we will use fastForward - // return - //} - - for contract := range t.data { - contracts = append(contracts, common.HexToAddress(contract)) - } - - fmt.Println("Doing Fast Forward") - - //if len(contracts) == 0 { - // return - //} - - logs, err := t.adapter.GetLogs(int64(t.blockNumber)+1, int64(lastBlockNumber), contracts) - if err != nil { - fmt.Println(err) - return - } - - t.ProcessDiffs(lastBlockNumber, logs) -} - -func (t *Table) ProcessDiffs(blockNumber uint64, logs []any) { - //fmt.Printf("Block: %d \n", blockNumber) - //events := make([]*UpdateEvent, 0) - //stakeEvents := make([]*StakeEvent, 0) - // - //for _, diff := range diffs { - // _, ok := t.data[diff.Token] - // if !ok { - // // No such contract - // continue - // } - // b, ok := t.data[diff.Token][diff.From] - // if ok && b != nil { // if - // // From wallet found - // b.Sub(b, diff.Amount) - // events = append(events, &UpdateEvent{ - // Wallet: diff.From, - // Contract: diff.Token, - // Amount: b, // TODO ask should we clone here by value - // }) - // } - // b, ok = t.data[diff.Token][diff.To] - // if ok && b != nil { - // // To wallet found - // b.Add(b, diff.Amount) - // events = append(events, &UpdateEvent{ - // Wallet: diff.To, - // Contract: diff.Token, - // Amount: b, - // }) - // } - //} - // - //for _, stake := range stakes { - // _, ok := t.stakesData[stake.OdysseyID] - // if !ok { - // t.stakesData[stake.OdysseyID] = make(map[string]*big.Int) - // } - // - // t.stakesData[stake.OdysseyID][stake.From] = stake.TotalAmount - // stakeEvents = append(stakeEvents, &StakeEvent{ - // Wallet: stake.From, - // OdysseyID: stake.OdysseyID, - // Amount: stake.TotalAmount, - // }) - //} - // - //t.blockNumber = blockNumber - // - //_, name, _ := t.adapter.GetInfo() - //t.harvesterListener(name, events, stakeEvents) - // - //err := t.SaveToDB(events, stakeEvents) - //if err != nil { - // log.Fatal(err) - //} - //t.Display() -} - -func (t *Table) listener(blockNumber uint64, diffs []*BCDiff, stakes []*BCStake) { - t.fastForward() - //t.mu.Lock() - //t.ProcessDiffs(blockNumber, diffs, stakes) - //t.mu.Unlock() -} - -func (t *Table) SaveToDB(events []*UpdateEvent, stakeEvents []*StakeEvent) (err error) { - wallets := make([]Address, 0) - contracts := make([]Address, 0) - // Save balance by value to quickly unlock mutex, otherwise have to unlock util DB transaction finished - balances := make([]*entry.Balance, 0) - stakeEntries := make([]*entry.Stake, 0) - - blockchainUMID, _, _ := t.adapter.GetInfo() - - for _, event := range events { - if event.Amount == nil { - continue - } - wallets = append(wallets, HexToAddress(event.Wallet)) - contracts = append(contracts, HexToAddress(event.Contract)) - balances = append(balances, &entry.Balance{ - WalletID: HexToAddress(event.Wallet), - ContractID: HexToAddress(event.Contract), - BlockchainID: blockchainUMID, - LastProcessedBlockNumber: t.blockNumber, - Balance: (*entry.BigInt)(event.Amount), - }) - } - - for _, stake := range stakeEvents { - wallets = append(wallets, HexToAddress(stake.Wallet)) - stakeEntries = append(stakeEntries, &entry.Stake{ - WalletID: HexToAddress(stake.Wallet), - BlockchainID: blockchainUMID, - ObjectID: stake.OdysseyID, - LastComment: "", - Amount: (*entry.BigInt)(stake.Amount), - }) - } - - wallets = unique(wallets) - - fmt.Println(stakeEntries) - - return t.saveToDB(wallets, contracts, balances, stakeEntries) -} - -func (t *Table) saveToDB(wallets []Address, contracts []Address, balances []*entry.Balance, stakeEntries []*entry.Stake) error { - blockchainUMID, name, rpcURL := t.adapter.GetInfo() - - tx, err := t.db.BeginTx(context.Background(), pgx.TxOptions{}) - if err != nil { - return errors.WithMessage(err, "failed to begin transaction") - } - defer func() { - if err != nil { - fmt.Println("!!! Rollback") - e := tx.Rollback(context.TODO()) - if e != nil { - fmt.Println("???") - fmt.Println(e) - } - } else { - //fmt.Println("!!! Commit") - e := tx.Commit(context.TODO()) - if e != nil { - fmt.Println("???!!!") - fmt.Println(e) - } - } - }() - - sql := `INSERT INTO blockchain (blockchain_id, last_processed_block_number, blockchain_name, rpc_url, updated_at) - VALUES ($1, $2, $3, $4, NOW()) - ON CONFLICT (blockchain_id) DO UPDATE SET last_processed_block_number=$2, - blockchain_name=$3, - rpc_url=$4, - updated_at=NOW();` - - val := &entry.Blockchain{ - BlockchainID: blockchainUMID, - LastProcessedBlockNumber: t.blockNumber, - BlockchainName: name, - RPCURL: rpcURL, - } - _, err = tx.Exec(context.Background(), sql, - val.BlockchainID, val.LastProcessedBlockNumber, val.BlockchainName, val.RPCURL) - if err != nil { - return errors.WithMessage(err, "failed to insert or update blockchain DB query") - } - - sql = `INSERT INTO wallet (wallet_id, blockchain_id) - VALUES ($1::bytea, $2) - ON CONFLICT (blockchain_id, wallet_id) DO NOTHING ` - for _, w := range wallets { - _, err = tx.Exec(context.Background(), sql, w, blockchainUMID) - if err != nil { - err = errors.WithMessage(err, "failed to insert wallet to DB") - return err - } - } - - sql = `INSERT INTO contract (contract_id, name) - VALUES ($1, $2) - ON CONFLICT (contract_id) DO NOTHING` - for _, c := range contracts { - _, err = tx.Exec(context.TODO(), sql, c, "") - if err != nil { - err = errors.WithMessage(err, "failed to insert contract to DB") - return err - } - } - - sql = `INSERT INTO balance (wallet_id, contract_id, blockchain_id, balance, last_processed_block_number) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (wallet_id, contract_id, blockchain_id) - DO UPDATE SET balance = $4, - last_processed_block_number = $5` - - for _, b := range balances { - _, err = tx.Exec(context.TODO(), sql, - b.WalletID, b.ContractID, b.BlockchainID, b.Balance, b.LastProcessedBlockNumber) - if err != nil { - err = errors.WithMessage(err, "failed to insert balance to DB") - return err - } - } - - sql = `INSERT INTO stake (wallet_id, blockchain_id, object_id, amount, last_comment, updated_at, created_at) - VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) - ON CONFLICT (blockchain_id, wallet_id, object_id) - DO UPDATE SET updated_at = NOW(), - amount = $4` - - for _, s := range stakeEntries { - _, err = tx.Exec(context.TODO(), sql, - s.WalletID, blockchainUMID, s.ObjectID, s.Amount, "") - if err != nil { - err = errors.WithMessage(err, "failed to insert stakes to DB") - return err - } - } - - return nil -} - -func (t *Table) LoadFromDB() error { - blockchainUMID, _, _ := t.adapter.GetInfo() - - tx, err := t.db.BeginTx(context.TODO(), pgx.TxOptions{}) - if err != nil { - return errors.WithMessage(err, "failed to begin transaction") - } - defer func() { - if err != nil { - tx.Rollback(context.TODO()) - } else { - tx.Commit(context.TODO()) - } - }() - - sql := `SELECT last_processed_block_number FROM blockchain WHERE blockchain_id=$1` - - row := tx.QueryRow(context.TODO(), sql, blockchainUMID) - b := &entry.Blockchain{} - - t.mu.Lock() - defer t.mu.Unlock() - - if err := row.Scan(&b.LastProcessedBlockNumber); err != nil { - if err != pgx.ErrNoRows { - return errors.WithMessage(err, "failed to scan row from blockchain table") - } - } - - sql = `SELECT wallet_id, contract_id, balance.balance - FROM balance - WHERE blockchain_id = $1` - - rows, err := tx.Query(context.TODO(), sql, blockchainUMID) - if err != nil { - return err - } - - for rows.Next() { - var wallet common.Address - var contract common.Address - var balance entry.BigInt - - if err := rows.Scan(&wallet, &contract, &balance); err != nil { - return errors.WithMessage(err, "failed to scan rows from balance table") - } - - walletStr := strings.ToLower(wallet.Hex()) - contractStr := strings.ToLower(contract.Hex()) - - _, ok := t.data[contractStr] - if !ok { - t.data[contractStr] = make(map[string]*big.Int) - } - t.data[contractStr][walletStr] = (*big.Int)(&balance) - - //fmt.Println(wallet.Hex(), contract.Hex(), (*big.Int)(&balance).String()) - } - - // If DB transaction fail block will not be updated - t.blockNumber = b.LastProcessedBlockNumber - return nil -} - -func (t *Table) AddWalletContract(wallet string, contract string) { - wallet = strings.ToLower(wallet) - contract = strings.ToLower(contract) - - t.mu.Lock() - _, ok := t.data[contract] - if !ok { - t.data[contract] = make(map[string]*big.Int) - } - _, ok = t.data[contract][wallet] - if !ok { - t.data[contract][wallet] = nil - // Such wallet has not existed so need to get initial balance - go t.syncBalance(wallet, contract) - } - t.mu.Unlock() -} - -func (t *Table) syncBalance(wallet string, contract string) { - wallet = strings.ToLower(wallet) - contract = strings.ToLower(contract) - - t.mu.RLock() - blockNumber, err := t.adapter.GetLastBlockNumber() - t.mu.RUnlock() - if err != nil { - err = errors.WithMessage(err, "failed to get last block number") - fmt.Println(err) - } - balance, err := t.adapter.GetBalance(wallet, contract, blockNumber) - if err != nil { - err = errors.WithMessagef(err, "failed to get balance: %s, %s, %d", wallet, contract, blockNumber) - fmt.Println(err) - } - t.mu.Lock() - if t.blockNumber == 0 || t.blockNumber == blockNumber { - t.data[contract][wallet] = balance - events := make([]*UpdateEvent, 0) - events = append(events, &UpdateEvent{ - Wallet: wallet, - Contract: contract, - Amount: balance, - }) - err := t.SaveToDB(events, nil) - if err != nil { - fmt.Println(err) - } - t.mu.Unlock() - } else { - t.mu.Unlock() - t.syncBalance(wallet, contract) - } -} - -func (t *Table) Display() { - fmt.Println("Display:") - for token, wallets := range t.data { - for wallet, balance := range wallets { - fmt.Printf("%+v %+v %+v \n", token, wallet, balance.String()) - } - } -} - -func HexToAddress(s string) []byte { - b, err := hex.DecodeString(s[2:]) - if err != nil { - panic(err) - } - return b -} - -func unique(slice []Address) []Address { - keys := make(map[string]bool) - list := []Address{} - for _, entry := range slice { - entryStr := hex.EncodeToString(entry) - if _, value := keys[entryStr]; !value { - keys[entryStr] = true - list = append(list, entry) - } - } - return list -} diff --git a/harvester3/arbitrum_nova_adapter3/arbitrum_nova_adapter.go b/harvester3/arbitrum_nova_adapter3/arbitrum_nova_adapter.go deleted file mode 100644 index 82b205a9..00000000 --- a/harvester3/arbitrum_nova_adapter3/arbitrum_nova_adapter.go +++ /dev/null @@ -1,544 +0,0 @@ -package arbitrum_nova_adapter3 - -import ( - "context" - "encoding/json" - "fmt" - "log" - "math" - "math/big" - "strconv" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/rpc" - "github.com/pkg/errors" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - - "github.com/momentum-xyz/ubercontroller/config" - "github.com/momentum-xyz/ubercontroller/harvester3" - "github.com/momentum-xyz/ubercontroller/utils/umid" -) - -type ArbitrumNovaAdapter struct { - listeners []func(blockNumber uint64) - umid umid.UMID - httpURL string - name string - rpcClient *rpc.Client - lastBlock uint64 - - logger *zap.SugaredLogger -} - -func NewArbitrumNovaAdapter(cfg *config.Arbitrum3, logger *zap.SugaredLogger) *ArbitrumNovaAdapter { - return &ArbitrumNovaAdapter{ - umid: umid.MustParse("ccccaaaa-1111-2222-3333-222222222222"), - httpURL: cfg.RPCURL, - name: "arbitrum_nova", - listeners: make([]func(blockNumber uint64), 0), - - logger: logger, - } -} - -func (a *ArbitrumNovaAdapter) GetLastBlockNumber() (uint64, error) { - var resp string - if err := a.rpcClient.Call(&resp, "eth_blockNumber"); err != nil { - return 0, errors.WithMessage(err, "failed to make RPC call to arbitrum:") - } - - return hex2int(resp), nil -} - -func (a *ArbitrumNovaAdapter) Run() { - var err error - a.rpcClient, err = rpc.DialHTTP(a.httpURL) - if err != nil { - a.logger.Error(err) - } - - a.logger.Info("Connected to Arbitrum Block Chain: " + a.httpURL) - - ticker := time.NewTicker(1000 * time.Millisecond) - done := make(chan bool) - go func() { - for { - select { - case <-done: - return - case t := <-ticker.C: - _ = t - n, err := a.GetLastBlockNumber() - if err != nil { - a.logger.Error(err) - } - if a.lastBlock < n { - a.lastBlock = n - for _, listener := range a.listeners { - listener(n) - } - } - } - } - }() -} - -func (a *ArbitrumNovaAdapter) RegisterNewBlockListener(f harvester3.AdapterListener) { - a.listeners = append(a.listeners, f) -} - -func (a *ArbitrumNovaAdapter) GetTokenBalance(contract *common.Address, wallet *common.Address, blockNumber uint64) (*big.Int, uint64, error) { - type request struct { - To string `json:"to"` - Data string `json:"data"` - } - - w := wallet.Hex() - c := contract.Hex() - - // "0x70a08231" - crypto.Keccak256Hash([]byte("balanceOf(address)")).String()[0:10] - data := "0x70a08231" + fmt.Sprintf("%064s", w[2:]) // %064s means that the string is padded with 0 to 32 bytes - req := request{c, data} - - var resp string - n := hexutil.EncodeUint64(blockNumber) - if err := a.rpcClient.Call(&resp, "eth_call", req, n); err != nil { - return nil, 0, errors.WithMessage(err, "failed to make RPC call to arbitrum:") - } - - balance := stringToBigInt(resp) - - return balance, blockNumber, nil -} - -func stringToBigInt(str string) *big.Int { - // remove leading zero of resp - t := strings.TrimLeft(str[2:], "0") - if t == "" { - t = "0" - } - s := "0x" + t - b, err := hexutil.DecodeBig(s) - if err != nil { - log.Fatal(err) - } - return b -} - -func (a *ArbitrumNovaAdapter) GetTransactionMessage(tx *types.Transaction) *core.Message { - msg, err := core.TransactionToMessage(tx, types.LatestSignerForChainID(tx.ChainId()), nil) - if err != nil { - log.Fatal(err) - } - return msg -} - -// refer https://github.com/ethereum/web3.py/blob/master/web3/contract.py#L435 -func (a *ArbitrumNovaAdapter) DecodeTransactionInputData(contractABI *abi.ABI, data []byte) (string, map[string]any, error) { - // The first 4 bytes of the txn represent the ID of the method in the ABI - //fmt.Println(len(data)) - methodSigData := data[:4] - method, err := contractABI.MethodById(methodSigData) - if err != nil { - err = errors.WithMessage(err, "failed to get ABI contract method by id") - return "", nil, err - } - - // parse the inputs to this method - inputsSigData := data[4:] - inputsMap := make(map[string]interface{}) - if err := method.Inputs.UnpackIntoMap(inputsMap, inputsSigData); err != nil { - err = errors.WithMessage(err, "failed to unpack ABI contract method into map") - return "", nil, err - } - //fmt.Printf("Method Name: %s\n", method.Name) - //fmt.Printf("Method inputs: %v\n", MapToJson(inputsMap)) - - return method.Name, inputsMap, nil -} - -func (a *ArbitrumNovaAdapter) GetRawLogs( - topic0 *common.Hash, - topic1 *common.Hash, - topic2 *common.Hash, - addresses []common.Address, - fromBlock *big.Int, - toBlock *big.Int, -) (replies []types.Log, err error) { - args := make(map[string]interface{}) - var topix []any - topix = append(topix, topic0) - topix = append(topix, topic1) - topix = append(topix, topic2) - - args["topics"] = topix - args["address"] = addresses - args["fromBlock"] = hexutil.EncodeBig(fromBlock) - args["toBlock"] = hexutil.EncodeBig(toBlock) - if err != nil { - return - } - err = a.rpcClient.CallContext(context.TODO(), &replies, "eth_getLogs", args) - return -} - -func (a *ArbitrumNovaAdapter) GetEtherBalance(wallet *common.Address, block uint64) (*big.Int, error) { - resp := "" - err := a.rpcClient.Call(&resp, "eth_getBalance", wallet.Hex(), hexutil.EncodeUint64(block)) - if err != nil { - return nil, err - } - - balance := stringToBigInt(resp) - return balance, err -} - -func (a *ArbitrumNovaAdapter) GetEtherLogs(fromBlock, toBlock uint64, wallets map[common.Address]bool) ([]harvester3.ChangeEtherLog, error) { - workers := uint64(20) - chunkSize := math.Ceil(float64((toBlock - fromBlock + 1) / workers)) - if chunkSize == 0 { - chunkSize = 1 - } - - blockNumbers := make([]uint64, 0) - for b := fromBlock; b <= toBlock; b++ { - blockNumbers = append(blockNumbers, b) - } - - chunks := chunkSlice(blockNumbers, uint64(chunkSize)) - - var g errgroup.Group - mu := sync.Mutex{} - logs := make([]harvester3.ChangeEtherLog, 0) - - for _, chunk := range chunks { - func(chunk []uint64) { - g.Go(func() error { - from := chunk[0] - to := chunk[len(chunk)-1] - l, err := a.getEtherLogs(from, to, wallets) - mu.Lock() - logs = append(logs, l...) - mu.Unlock() - return err - }) - }(chunk) - } - - if err := g.Wait(); err != nil { - return nil, err - } - - return logs, nil -} - -func (a *ArbitrumNovaAdapter) getEtherLogs(fromBlock, toBlock uint64, wallets map[common.Address]bool) ([]harvester3.ChangeEtherLog, error) { - logs := make([]harvester3.ChangeEtherLog, 0) - res := make(map[string]any) - - type respTx struct { - Hash string `json:"hash"` - From string `json:"from"` - To *string `json:"to"` - Value string `json:"value"` - Gas string `json:"gas"` - } - - mu := sync.Mutex{} - - for b := fromBlock; b <= toBlock; b++ { - blockNumber := hexutil.EncodeUint64(b) - a.logger.Debugln("eth_getBlockByNumber", fromBlock, toBlock) - err := a.rpcClient.Call(&res, "eth_getBlockByNumber", blockNumber, true) - if err != nil { - return nil, err - } - txs, ok := res["transactions"] - if !ok { - continue - } - txsMap, ok := txs.([]any) - for _, txMap := range txsMap { - jsonData, _ := json.Marshal(txMap) - tx := respTx{} - err = json.Unmarshal(jsonData, &tx) - if err != nil { - return nil, err - } - - to := common.Address{} - if tx.To != nil { - to = common.HexToAddress(*tx.To) - } - from := common.HexToAddress(tx.From) - - _, hasFrom := wallets[from] - _, hasTo := wallets[to] - - if !hasFrom && !hasTo { - continue - } - - if tx.Gas != "0x0" && hasFrom { - func() { - receipt, err := a.eth_getTransactionReceipt(tx.Hash) - if err != nil { - a.logger.Error(err) - return - } - //if receipt.Status != "0x1" { - // return - //} - - gasUsed := big.NewInt(int64(hex2int(receipt.GasUsed))) - gasPrice := big.NewInt(int64(hex2int(receipt.EffectiveGasPrice))) - delta := gasUsed.Mul(gasUsed, gasPrice) - delta = delta.Neg(delta) - - mu.Lock() - logs = append(logs, harvester3.ChangeEtherLog{ - Block: b, - Wallet: common.HexToAddress(receipt.From), - Delta: delta, - }) - mu.Unlock() - }() - } - - if tx.Value == "0x0" { - // Tx fee already counted in previous section - continue - } - - if hasTo { - logs = append(logs, harvester3.ChangeEtherLog{ - Block: b, - Wallet: to, - Delta: big.NewInt(int64(hex2int(tx.Value))), - }) - } - if hasFrom { - delta := big.NewInt(int64(hex2int(tx.Value))) - logs = append(logs, harvester3.ChangeEtherLog{ - Block: b, - Wallet: common.HexToAddress(tx.From), - Delta: delta.Neg(delta), - }) - } - } - } - - return logs, nil -} - -type TransactionReceipt struct { - From string `json:"from"` - Status string `json:"status"` - GasUsed string `json:"gasUsed"` - EffectiveGasPrice string `json:"effectiveGasPrice"` -} - -func (a *ArbitrumNovaAdapter) eth_getTransactionReceipt(hash string) (*TransactionReceipt, error) { - res := TransactionReceipt{} - err := a.rpcClient.Call(&res, "eth_getTransactionReceipt", hash) - if err != nil { - return nil, err - } - return &res, nil -} - -func (a *ArbitrumNovaAdapter) GetNFTLogs(fromBlock, toBlock uint64, contracts []common.Address) ([]any, error) { - logTransferSig := []byte("Transfer(address,address,uint256)") - logTransferSigHash := crypto.Keccak256Hash(logTransferSig) - bcLogs, err := a.GetRawLogs(&logTransferSigHash, nil, nil, contracts, big.NewInt(int64(fromBlock)), big.NewInt(int64(toBlock))) - if err != nil { - return nil, errors.WithMessage(err, "failed to filter log") - } - - logs := make([]any, 0) - - for _, vLog := range bcLogs { - - if len(vLog.Topics) != 4 { - a.logger.Error("Transfer NFT log must have 4 topic items") - continue - } - //fmt.Printf("Log Block Number: %d\n", vLog.BlockNumber) - //fmt.Printf("Log Index: %d\n", vLog.Index) - - var e harvester3.TransferNFTLog - - e.Block = vLog.BlockNumber - e.Contract = vLog.Address - // Hex and Un Hex here used to remove padding zeros - e.From = common.HexToAddress(vLog.Topics[1].Hex()) - e.To = common.HexToAddress(vLog.Topics[2].Hex()) - e.TokenID = vLog.Topics[3] - - logs = append(logs, &e) - } - - return logs, nil -} - -func (a *ArbitrumNovaAdapter) GetTokenLogs(fromBlock, toBlock uint64, contracts []common.Address) ([]any, error) { - logTransferSig := []byte("Transfer(address,address,uint256)") - logTransferSigHash := crypto.Keccak256Hash(logTransferSig) - bcLogs, err := a.GetRawLogs(&logTransferSigHash, nil, nil, contracts, big.NewInt(int64(fromBlock)), big.NewInt(int64(toBlock))) - if err != nil { - return nil, errors.WithMessage(err, "failed to filter log") - } - - logs := make([]any, 0) - - for _, vLog := range bcLogs { - - if len(vLog.Topics) == 4 { - a.logger.Error("Got Transfer NFT log from blockchain in Token contract handler") - continue - } - //fmt.Printf("Log Block Number: %d\n", vLog.BlockNumber) - //fmt.Printf("Log Index: %d\n", vLog.Index) - - var e harvester3.TransferERC20Log - - e.Block = vLog.BlockNumber - e.Contract = vLog.Address - // Hex and Un Hex here used to remove padding zeros - e.From = common.HexToAddress(vLog.Topics[1].Hex()) - e.To = common.HexToAddress(vLog.Topics[2].Hex()) - - data := common.TrimLeftZeroes(vLog.Data) - hex := common.Bytes2Hex(data) - hex = TrimLeftZeroes(hex) - if hex == "" { - a.logger.Error("Got Transfer Token log with empty data") - continue - } - erc20Amount, err := hexutil.DecodeBig("0x" + hex) - if err != nil { - a.logger.Error(err) - } - e.Value = erc20Amount - - logs = append(logs, &e) - } - - return logs, nil -} - -func TrimLeftZeroes(hex string) string { - idx := 0 - for ; idx < len(hex); idx++ { - if hex[idx] != '0' { - break - } - } - return hex[idx:] -} - -func (a *ArbitrumNovaAdapter) GetInfo() (umid umid.UMID, name string, rpcURL string) { - return a.umid, a.name, a.httpURL -} - -func hex2int(hexStr string) uint64 { - // remove 0x suffix if found in the input string - cleaned := strings.Replace(hexStr, "0x", "", -1) - - // base 16 for hexadecimal - result, _ := strconv.ParseUint(cleaned, 16, 64) - return result -} - -func (a *ArbitrumNovaAdapter) GetNFTBalance(nftContract *common.Address, wallet *common.Address, block uint64) ([]common.Hash, error) { - transferString := "Transfer(address,address,uint256)" - transferTopic := common.BytesToHash(crypto.Keccak256([]byte(transferString))) - - if nftContract == nil { - return nil, errors.New("Failed to GetNFTBalance: NFT contract can not be nil") - } - - contracts := []common.Address{ - *nftContract, - } - - logsFrom, err := a.GetRawLogs(&transferTopic, addrToHash(wallet), nil, contracts, big.NewInt(0), big.NewInt(int64(block))) - if err != nil { - return nil, errors.WithMessage(err, "failed to get logs for nft contract") - } - - logsTo, err := a.GetRawLogs(&transferTopic, nil, addrToHash(wallet), contracts, big.NewInt(0), big.NewInt(int64(block))) - if err != nil { - return nil, errors.WithMessage(err, "failed to get logs for nft contract") - } - - m := make(map[common.Hash]int8) - - for _, l := range logsFrom { - id := l.Topics[3] - - _, ok := m[id] - if !ok { - m[id] = 0 - } - - m[id] -= 1 - } - - for _, l := range logsTo { - id := l.Topics[3] - - _, ok := m[id] - if !ok { - m[id] = 0 - } - - m[id] += 1 - } - - ids := make([]common.Hash, 0) - for id, i := range m { - if i != 0 && i != 1 { - a.logger.Error("Failed to parse NFT transfers, Something wrong in blockchain history") - } - if i == 1 { - ids = append(ids, id) - } - } - - return ids, nil -} - -func addrToHash(addr *common.Address) *common.Hash { - if addr == nil { - return nil - } - res := common.HexToHash(addr.Hex()) - return &res -} - -func chunkSlice(slice []uint64, chunkSize uint64) [][]uint64 { - var chunks [][]uint64 - for i := uint64(0); i < uint64(len(slice)); i += chunkSize { - end := i + chunkSize - - // necessary check to avoid slicing beyond - // slice capacity - if end > uint64(len(slice)) { - end = uint64(len(slice)) - } - - chunks = append(chunks, slice[i:end]) - } - - return chunks -} diff --git a/harvester3/interfaces.go b/harvester3/interfaces.go deleted file mode 100644 index 89be1cc1..00000000 --- a/harvester3/interfaces.go +++ /dev/null @@ -1,46 +0,0 @@ -package harvester3 - -import ( - "math/big" - - "github.com/ethereum/go-ethereum/common" - - "github.com/momentum-xyz/ubercontroller/utils/umid" -) - -type AdapterListener func(blockNumber uint64) - -type TransferERC20Log struct { - Block uint64 - From common.Address - To common.Address - Value *big.Int - Contract common.Address -} - -type TransferNFTLog struct { - Block uint64 - From common.Address - To common.Address - TokenID common.Hash - Contract common.Address -} - -type ChangeEtherLog struct { - Block uint64 - Wallet common.Address - Delta *big.Int -} - -type Adapter interface { - GetLastBlockNumber() (uint64, error) - GetTokenBalance(contract *common.Address, wallet *common.Address, blockNumber uint64) (*big.Int, uint64, error) - GetNFTBalance(nftContract *common.Address, wallet *common.Address, block uint64) ([]common.Hash, error) - GetEtherBalance(wallet *common.Address, block uint64) (*big.Int, error) - GetTokenLogs(fromBlock, toBlock uint64, addresses []common.Address) ([]any, error) - GetNFTLogs(fromBlock, toBlock uint64, contracts []common.Address) ([]any, error) - GetEtherLogs(fromBlock, toBlock uint64, wallets map[common.Address]bool) ([]ChangeEtherLog, error) - RegisterNewBlockListener(f AdapterListener) - Run() - GetInfo() (umid umid.UMID, name string, rpcURL string) -} diff --git a/universe/node/nfts.go b/universe/node/nfts.go index 127eb881..e1890695 100644 --- a/universe/node/nfts.go +++ b/universe/node/nfts.go @@ -9,7 +9,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap/zapcore" - "github.com/momentum-xyz/ubercontroller/harvester" + "github.com/momentum-xyz/ubercontroller/contracter" "github.com/momentum-xyz/ubercontroller/types/entry" "github.com/momentum-xyz/ubercontroller/universe" "github.com/momentum-xyz/ubercontroller/universe/logic/tree" @@ -17,7 +17,11 @@ import ( "github.com/momentum-xyz/ubercontroller/utils/umid" ) -func (n *Node) Listener(bcName string, events []*harvester.UpdateEvent, stakeEvents []*harvester.StakeEvent, nftEvent []*harvester.NftEvent) error { +func (n *Node) Listener(bcName string, + events []*contracter.UpdateEvent, + stakeEvents []*contracter.StakeEvent, + nftEvent []*contracter.NftEvent, + transferEvents []*contracter.TransferOdysseyEvent) error { if n.log.Level() == zapcore.DebugLevel { n.log.Debugln("Table Listener:") for k, v := range events { @@ -78,11 +82,11 @@ func (n *Node) IsLocalWorld(world universe.Object) bool { return n.GetID() == world.GetParent().GetID() } -func (n *Node) AddStakeActivities(stakeEvents []*harvester.StakeEvent) error { +func (n *Node) AddStakeActivities(stakeEvents []*contracter.StakeEvent) error { node := universe.GetNode() activities := node.GetActivities().GetActivities() - newStakeEvents := make([]*harvester.StakeEvent, 0) + newStakeEvents := make([]*contracter.StakeEvent, 0) for _, s := range stakeEvents { exists := false @@ -109,7 +113,7 @@ func (n *Node) AddStakeActivities(stakeEvents []*harvester.StakeEvent) error { return nil } -func (n *Node) AddStakeActivity(stakeEvent *harvester.StakeEvent) error { +func (n *Node) AddStakeActivity(stakeEvent *contracter.StakeEvent) error { node := universe.GetNode() _, ok := n.GetObject(stakeEvent.OdysseyID, true) diff --git a/universe/node/node.go b/universe/node/node.go index 71f36b51..47d13f24 100644 --- a/universe/node/node.go +++ b/universe/node/node.go @@ -18,9 +18,9 @@ import ( "github.com/pkg/errors" "github.com/momentum-xyz/ubercontroller/config" + "github.com/momentum-xyz/ubercontroller/contracter" + "github.com/momentum-xyz/ubercontroller/contracter/arbitrum_nova_adapter" "github.com/momentum-xyz/ubercontroller/database" - "github.com/momentum-xyz/ubercontroller/harvester" - "github.com/momentum-xyz/ubercontroller/harvester/arbitrum_nova_adapter" "github.com/momentum-xyz/ubercontroller/mplugin" "github.com/momentum-xyz/ubercontroller/pkg/media" "github.com/momentum-xyz/ubercontroller/seed" @@ -304,7 +304,7 @@ func (n *Node) Run() error { } defer pool.Close() - t := harvester.NewTable2(pool, adapter, n.Listener) + t := contracter.NewTable(pool, adapter, n.Listener) t.Run() }