Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove globals and init() side-effects from blockchain package #3

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 8 additions & 61 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ package blockchain

import (
"bytes"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"

"github.com/coreos/bbolt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/db"
log "github.com/sirupsen/logrus"
Expand All @@ -31,66 +29,15 @@ type jobInfo struct {
jobSignatureBytes []byte
}

var (
ethClient *ethclient.Client
rawClient *rpc.Client
agent *Agent
privateKey *ecdsa.PrivateKey
address string
jobCompletionQueue chan *jobInfo
)

func init() {
if config.GetBool(config.BlockchainEnabledKey) {
// Setup ethereum client
if client, err := rpc.Dial(config.GetString(config.EthereumJsonRpcEndpointKey)); err != nil {
log.WithError(err).Panic("error creating rpc client")
} else {
rawClient = client
ethClient = ethclient.NewClient(rawClient)
}

// Setup agent
if a, err := NewAgent(common.HexToAddress(config.GetString(config.AgentContractAddressKey)), ethClient); err != nil {
log.WithError(err).Panic("error instantiating agent")
} else {
agent = a
}

// Setup identity
if privateKeyString := config.GetString(config.PrivateKeyKey); privateKeyString != "" {
if privKey, err := crypto.HexToECDSA(privateKeyString); err != nil {
log.WithError(err).Panic("error getting private key")
} else {
privateKey = privKey
address = crypto.PubkeyToAddress(privateKey.PublicKey).Hex()
}
} else if hdwalletMnemonic := config.GetString(config.HdwalletMnemonicKey); hdwalletMnemonic != "" {
if privKey, err := derivePrivateKey(hdwalletMnemonic, 44, 60, 0, 0, uint32(config.GetInt(config.HdwalletIndexKey))); err != nil {
log.WithError(err).Panic("error deriving private key")
} else {
privateKey = privKey
address = crypto.PubkeyToAddress(privateKey.PublicKey).Hex()
}
}

// Start event and job completion routines
jobCompletionQueue = make(chan *jobInfo, 1000)
go processJobCompletions()
go processEvents()
go submitOldJobsForCompletion()
}
}

func GetGrpcStreamInterceptor() grpc.StreamServerInterceptor {
func (p Processor) GrpcStreamInterceptor() grpc.StreamServerInterceptor {
if config.GetBool(config.BlockchainEnabledKey) {
return jobValidationInterceptor
return p.jobValidationInterceptor
} else {
return noOpInterceptor
}
}

func IsValidJobInvocation(jobAddressBytes, jobSignatureBytes []byte) bool {
func (p Processor) IsValidJobInvocation(jobAddressBytes, jobSignatureBytes []byte) bool {
log := log.WithFields(log.Fields{
"jobAddress": common.BytesToAddress(jobAddressBytes).Hex(),
"jobSignature": hex.EncodeToString(jobSignatureBytes)})
Expand Down Expand Up @@ -136,9 +83,9 @@ func IsValidJobInvocation(jobAddressBytes, jobSignatureBytes []byte) bool {
log.Debug("unable to validate job invocation locally; falling back to on-chain validation")

// Fall back to on-chain validation
if validated, err := agent.ValidateJobInvocation(&bind.CallOpts{
if validated, err := p.agent.ValidateJobInvocation(&bind.CallOpts{
Pending: true,
From: common.HexToAddress(address)}, common.BytesToAddress(jobAddressBytes), v, r, s); err != nil {
From: common.HexToAddress(p.address)}, common.BytesToAddress(jobAddressBytes), v, r, s); err != nil {
log.WithError(err).Error("error validating job on chain")
return false
} else if !validated {
Expand All @@ -150,7 +97,7 @@ func IsValidJobInvocation(jobAddressBytes, jobSignatureBytes []byte) bool {
return true
}

func CompleteJob(jobAddressBytes, jobSignatureBytes []byte) {
func (p Processor) CompleteJob(jobAddressBytes, jobSignatureBytes []byte) {
job := &db.Job{}

// Mark the job completed in the db synchronously
Expand Down Expand Up @@ -178,5 +125,5 @@ func CompleteJob(jobAddressBytes, jobSignatureBytes []byte) {
}

// Submit the job for completion
jobCompletionQueue <- &jobInfo{jobAddressBytes, jobSignatureBytes}
p.jobCompletionQueue <- &jobInfo{jobAddressBytes, jobSignatureBytes}
}
6 changes: 3 additions & 3 deletions blockchain/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"google.golang.org/grpc/status"
)

func jobValidationInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
func (p Processor) jobValidationInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {

md, ok := metadata.FromIncomingContext(ss.Context())
Expand All @@ -30,10 +30,10 @@ func jobValidationInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.

jobSignatureBytes := common.FromHex(jobSignatureMd[0])

if IsValidJobInvocation(jobAddressBytes, jobSignatureBytes) {
if p.IsValidJobInvocation(jobAddressBytes, jobSignatureBytes) {
err := handler(srv, ss)
if err == nil {
CompleteJob(jobAddressBytes, jobSignatureBytes)
p.CompleteJob(jobAddressBytes, jobSignatureBytes)
}
return err
}
Expand Down
104 changes: 88 additions & 16 deletions blockchain/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,94 @@ package blockchain

import (
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"math/big"
"strings"
"time"

"github.com/coreos/bbolt"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/db"
log "github.com/sirupsen/logrus"
"math/big"
"strings"
"time"
)

func processJobCompletions() {
for jobInfo := range jobCompletionQueue {
type Processor struct {
ethClient *ethclient.Client
rawClient *rpc.Client
agent *Agent
privateKey *ecdsa.PrivateKey
address string
jobCompletionQueue chan *jobInfo
}

// NewProcessor creates a new blockchain processor
func NewProcessor() Processor {
// TODO(aiden) accept configuration as a parameter

p := Processor{
jobCompletionQueue: make(chan *jobInfo, 1000),
}

if !config.GetBool(config.BlockchainEnabledKey) {
return p
}

// Setup ethereum client
if client, err := rpc.Dial(config.GetString(config.EthereumJsonRpcEndpointKey)); err != nil {
// TODO(ai): return (processor, error) instead of panic
log.WithError(err).Panic("error creating rpc client")
} else {
p.rawClient = client
p.ethClient = ethclient.NewClient(client)
}

// Setup agent
if a, err := NewAgent(common.HexToAddress(config.GetString(config.AgentContractAddressKey)), p.ethClient); err != nil {
// TODO(ai): remove panic
log.WithError(err).Panic("error instantiating agent")
} else {
p.agent = a
}

// Setup identity
if privateKeyString := config.GetString(config.PrivateKeyKey); privateKeyString != "" {
if privKey, err := crypto.HexToECDSA(privateKeyString); err != nil {
// TODO(ai): remove panic
log.WithError(err).Panic("error getting private key")
} else {
p.privateKey = privKey
p.address = crypto.PubkeyToAddress(p.privateKey.PublicKey).Hex()
}
} else if hdwalletMnemonic := config.GetString(config.HdwalletMnemonicKey); hdwalletMnemonic != "" {
if privKey, err := derivePrivateKey(hdwalletMnemonic, 44, 60, 0, 0, uint32(config.GetInt(config.HdwalletIndexKey))); err != nil {
log.WithError(err).Panic("error deriving private key")
} else {
p.privateKey = privKey
p.address = crypto.PubkeyToAddress(p.privateKey.PublicKey).Hex()
}
}

return p
}

// StartLoops starts background processing for event and job completion routines
func (p Processor) StartLoop() {
go p.processJobCompletions()
go p.processEvents()
go p.submitOldJobsForCompletion()
}

func (p Processor) processJobCompletions() {
for jobInfo := range p.jobCompletionQueue {
log := log.WithFields(log.Fields{"jobAddress": common.BytesToAddress(jobInfo.jobAddressBytes).Hex(),
"jobSignature": hex.EncodeToString(jobInfo.jobSignatureBytes)})

Expand All @@ -28,19 +99,19 @@ func processJobCompletions() {
log.WithError(err).Error("error parsing job signature")
}

auth := bind.NewKeyedTransactor(privateKey)
auth := bind.NewKeyedTransactor(p.privateKey)

log.Debug("submitting transaction to complete job")
if txn, err := agent.CompleteJob(&bind.TransactOpts{
From: common.HexToAddress(address),
if txn, err := p.agent.CompleteJob(&bind.TransactOpts{
From: common.HexToAddress(p.address),
Signer: auth.Signer,
GasLimit: 1000000}, common.BytesToAddress(jobInfo.jobAddressBytes), v, r, s); err != nil {
log.WithError(err).Error("error submitting transaction to complete job")
} else {
isPending := true

for {
if _, isPending, _ = ethClient.TransactionByHash(context.Background(), txn.Hash()); !isPending {
if _, isPending, _ = p.ethClient.TransactionByHash(context.Background(), txn.Hash()); !isPending {
break
}
time.Sleep(time.Second * 1)
Expand All @@ -49,7 +120,7 @@ func processJobCompletions() {
}
}

func processEvents() {
func (p Processor) processEvents() {
sleepSecs := config.GetDuration(config.PollSleepSecsKey)
agentContractAddress := config.GetString(config.AgentContractAddressKey)

Expand All @@ -70,7 +141,7 @@ func processEvents() {
// We have to do a raw call because the standard method of ethClient.HeaderByNumber(ctx, nil) errors on
// unmarshaling the response currently. See https://github.com/ethereum/go-ethereum/issues/3230
var currentBlockHex string
if err = rawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
if err = p.rawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
log.WithError(err).Error("error determining current block")
continue
}
Expand All @@ -92,8 +163,9 @@ func processEvents() {
fromBlock := new(big.Int).Add(lastBlock, new(big.Int).SetUint64(1))

// If fromBlock <= currentBlock
// TODO(aiden) invert logic and early return
if fromBlock.Cmp(currentBlock) <= 0 {
if jobCreatedLogs, err := ethClient.FilterLogs(context.Background(), ethereum.FilterQuery{
if jobCreatedLogs, err := p.ethClient.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: currentBlock,
Addresses: []common.Address{common.HexToAddress(agentContractAddress)},
Expand Down Expand Up @@ -132,7 +204,7 @@ func processEvents() {
log.WithError(err).Error("error getting job created logs")
}

if jobFundedLogs, err := ethClient.FilterLogs(context.Background(), ethereum.FilterQuery{
if jobFundedLogs, err := p.ethClient.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: currentBlock,
Addresses: []common.Address{common.HexToAddress(agentContractAddress)},
Expand Down Expand Up @@ -169,7 +241,7 @@ func processEvents() {
log.WithError(err).Error("error getting job funded logs")
}

if jobCompletedLogs, err := ethClient.FilterLogs(context.Background(), ethereum.FilterQuery{
if jobCompletedLogs, err := p.ethClient.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: currentBlock,
Addresses: []common.Address{common.HexToAddress(agentContractAddress)},
Expand Down Expand Up @@ -206,7 +278,7 @@ func processEvents() {
}
}

func submitOldJobsForCompletion() {
func (p Processor) submitOldJobsForCompletion() {
db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(db.JobBucketName)
bucket.ForEach(func(k, v []byte) error {
Expand All @@ -217,7 +289,7 @@ func submitOldJobsForCompletion() {
"jobAddress": common.BytesToAddress(job.JobAddress).Hex(),
"jobSignature": hex.EncodeToString(job.JobSignature),
}).Debug("completing old job found in db")
jobCompletionQueue <- &jobInfo{job.JobAddress, job.JobSignature}
p.jobCompletionQueue <- &jobInfo{job.JobAddress, job.JobSignature}
}
return nil
})
Expand Down
10 changes: 6 additions & 4 deletions handler/handler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package handler

import (
"net/http"
"net/url"

"github.com/singnet/snet-daemon/blockchain"
"github.com/singnet/snet-daemon/config"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"net/http"
"net/url"
)

var (
Expand Down Expand Up @@ -55,6 +57,6 @@ func GetGrpcHandler() grpc.StreamHandler {
return grpcLoopback
}

func GetHttpHandler() http.HandlerFunc {
return httpToHttp
func GetHttpHandler(bp blockchain.Processor) http.Handler {
return httpToHttp(bp)
}
21 changes: 16 additions & 5 deletions handler/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@ package handler
import (
"bytes"
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/singnet/snet-daemon/blockchain"
"io/ioutil"
"net/http"

"github.com/ethereum/go-ethereum/common"
"github.com/singnet/snet-daemon/blockchain"
)

func httpToHttp(resp http.ResponseWriter, req *http.Request) {
type httpHandler struct {
bp blockchain.Processor
}

func httpToHttp(blockProc blockchain.Processor) http.Handler {
return httpHandler{
bp: blockProc,
}
}

func (h httpHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
var jobAddress, jobSignature string
var jobAddressBytes, jobSignatureBytes []byte

Expand Down Expand Up @@ -44,7 +55,7 @@ func httpToHttp(resp http.ResponseWriter, req *http.Request) {

jobAddressBytes, jobSignatureBytes = common.FromHex(jobAddress), common.FromHex(jobSignature)

if !blockchain.IsValidJobInvocation(jobAddressBytes, jobSignatureBytes) {
if !h.bp.IsValidJobInvocation(jobAddressBytes, jobSignatureBytes) {
http.Error(resp, "job invocation not valid", http.StatusUnauthorized)
return
}
Expand Down Expand Up @@ -83,6 +94,6 @@ func httpToHttp(resp http.ResponseWriter, req *http.Request) {
}

if blockchainEnabled {
blockchain.CompleteJob(jobAddressBytes, jobSignatureBytes)
h.bp.CompleteJob(jobAddressBytes, jobSignatureBytes)
}
}
Loading