Skip to content

Commit

Permalink
introduce stakepoold connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
jholdstock committed May 23, 2019
1 parent 3d621ef commit 77e8205
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 201 deletions.
114 changes: 35 additions & 79 deletions controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ import (
"github.com/gorilla/csrf"
"github.com/zenazn/goji/web"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
)

const (
Expand All @@ -61,7 +59,7 @@ type MainController struct {
closePoolMsg string
enableStakepoold bool
feeXpub *hdkeychain.ExtendedKey
grpcConnections []*grpc.ClientConn
StakepooldServers *stakepooldclient.StakepooldManager
poolEmail string
poolFees float64
poolLink string
Expand Down Expand Up @@ -102,7 +100,7 @@ func getClientIP(r *http.Request, realIPHeader string) string {
func NewMainController(params *chaincfg.Params, adminIPs []string,
adminUserIDs []string, APISecret string, APIVersionsSupported []int,
baseURL string, closePool bool, closePoolMsg string, enablestakepoold bool,
feeXpubStr string, grpcConnections []*grpc.ClientConn, poolFees float64,
feeXpubStr string, stakepooldConnMan *stakepooldclient.StakepooldManager, poolFees float64,
poolEmail, poolLink string, emailSender email.Sender, walletHosts, walletCerts, walletUsers,
walletPasswords []string, minServers int, realIPHeader,
votingXpubStr string, maxVotedAge int64, description string,
Expand Down Expand Up @@ -146,7 +144,7 @@ func NewMainController(params *chaincfg.Params, adminIPs []string,
closePoolMsg: closePoolMsg,
enableStakepoold: enablestakepoold,
feeXpub: feeKey,
grpcConnections: grpcConnections,
StakepooldServers: stakepooldConnMan,
poolEmail: poolEmail,
poolFees: poolFees,
poolLink: poolLink,
Expand Down Expand Up @@ -303,13 +301,9 @@ func (controller *MainController) APIAddress(c web.C, r *http.Request) ([]string

// Import the RedeemScript
var importedHeight int64
for i := range controller.grpcConnections {
importedHeight, err = stakepooldclient.StakepooldImportScript(controller.grpcConnections[i], serializedScript)
if err != nil {
log.Errorf("Error importing script on stakepoold rpc connection %d", i)
return nil, codes.Unavailable, "system error", errors.New("unable to process wallet commands")
}
log.Infof("Successfully imported script on stakepoold rpc connection %d", i)
importedHeight, err = controller.StakepooldServers.ImportScript(serializedScript)
if err != nil {
return nil, codes.Unavailable, "system error", errors.New("unable to process wallet commands")
}

userFeeAddr, err := controller.FeeAddressForUserID(int(user.Id))
Expand Down Expand Up @@ -472,25 +466,6 @@ func (controller *MainController) isAdmin(c web.C, r *http.Request) (bool, error
return true, nil
}

// StakepooldGetIgnoredLowFeeTickets performs a gRPC GetIgnoredLowFeeTickets
// request against all stakepoold instances and returns the first result fetched
// without errors
func (controller *MainController) StakepooldGetIgnoredLowFeeTickets() (map[chainhash.Hash]string, error) {
var err error
ignoredLowFeeTickets := make(map[chainhash.Hash]string)

// TODO need some better code here
for i := range controller.grpcConnections {
ignoredLowFeeTickets, err = stakepooldclient.StakepooldGetIgnoredLowFeeTickets(controller.grpcConnections[i])
// take the first non-error result
if err == nil {
return ignoredLowFeeTickets, err
}
}

return ignoredLowFeeTickets, err
}

// StakepooldUpdateTickets attempts to trigger all connected stakepoold
// instances to pull a data update of the specified kind.
func (controller *MainController) StakepooldUpdateTickets(dbMap *gorp.DbMap) error {
Expand All @@ -499,15 +474,10 @@ func (controller *MainController) StakepooldUpdateTickets(dbMap *gorp.DbMap) err
return err
}

for i := range controller.grpcConnections {
err := stakepooldclient.StakepooldSetAddedLowFeeTickets(controller.grpcConnections[i], votableLowFeeTickets)
if err != nil {
log.Errorf("stakepoold host %d unable to update manual "+
"tickets grpc error: %v", i, err)
return err
}
log.Infof("Successfully triggered update tickets on stakepoold "+
"host %d", i)
err = controller.StakepooldServers.SetAddedLowFeeTickets(votableLowFeeTickets)
if err != nil {
log.Errorf("error updating tickets on stakepoold: %v", err)
return err
}

return nil
Expand All @@ -523,15 +493,10 @@ func (controller *MainController) StakepooldUpdateUsers(dbMap *gorp.DbMap) error
return err
}

for i := range controller.grpcConnections {
err := stakepooldclient.StakepooldSetUserVotingPrefs(controller.grpcConnections[i], allUsers)
if err != nil {
log.Errorf("stakepoold host %d unable to update voting config "+
"grpc error: %v", i, err)
return err
}
log.Infof("successfully triggered update users on stakepoold "+
"host %d", i)
err = controller.StakepooldServers.SetUserVotingPrefs(allUsers)
if err != nil {
log.Errorf("error updating users on stakepoold: %v", err)
return err
}

return nil
Expand Down Expand Up @@ -855,13 +820,9 @@ func (controller *MainController) AddressPost(c web.C, r *http.Request) (string,

// Import the RedeemScript
var importedHeight int64
for i := range controller.grpcConnections {
importedHeight, err = stakepooldclient.StakepooldImportScript(controller.grpcConnections[i], serializedScript)
if err != nil {
log.Errorf("Error importing script on stakepoold rpc connection %d", i)
return "/error", http.StatusSeeOther
}
log.Infof("Successfully imported script on stakepoold rpc connection %d", i)
importedHeight, err = controller.StakepooldServers.ImportScript(serializedScript)
if err != nil {
return "/error", http.StatusSeeOther
}

// Get the pool fees address for this user
Expand Down Expand Up @@ -894,28 +855,16 @@ func (controller *MainController) AdminStatus(c web.C, r *http.Request) (string,
}

type stakepooldInfoPage struct {
Status string
}

stakepooldPageInfo := make([]stakepooldInfoPage, len(controller.grpcConnections))

for i, conn := range controller.grpcConnections {
grpcStatus := "Unknown"
state := conn.GetState()
switch state {
case connectivity.Idle:
grpcStatus = "Idle"
case connectivity.Shutdown:
grpcStatus = "Shutdown"
case connectivity.Ready:
grpcStatus = "Ready"
case connectivity.Connecting:
grpcStatus = "Connecting"
case connectivity.TransientFailure:
grpcStatus = "TransientFailure"
}
RPCStatus string
}

stakepooldRPCStatus := controller.StakepooldServers.RPCStatus()

stakepooldPageInfo := make([]stakepooldInfoPage, len(stakepooldRPCStatus))

for i, grpcStatus := range stakepooldRPCStatus {
stakepooldPageInfo[i] = stakepooldInfoPage{
Status: grpcStatus,
RPCStatus: grpcStatus,
}
}

Expand Down Expand Up @@ -1021,6 +970,12 @@ func (controller *MainController) AdminTickets(c web.C, r *http.Request) (string
}
}

ignoredLowFeeTickets, err := controller.StakepooldServers.GetIgnoredLowFeeTickets()
if err != nil {
log.Errorf("Could not retrieve ignored low fee tickets from stakepoold: %v", err)
session.AddFlash("Could not retrieve ignored low fee tickets from stakepoold", "adminTicketsError")
}

c.Env["Admin"] = isAdmin
c.Env["IsAdminTickets"] = true
c.Env["Network"] = controller.getNetworkName()
Expand All @@ -1029,7 +984,8 @@ func (controller *MainController) AdminTickets(c web.C, r *http.Request) (string
c.Env["FlashSuccess"] = session.Flashes("adminTicketsSuccess")

c.Env["AddedLowFeeTickets"] = votableLowFeeTickets
c.Env["IgnoredLowFeeTickets"], _ = controller.StakepooldGetIgnoredLowFeeTickets()
c.Env["IgnoredLowFeeTickets"] = ignoredLowFeeTickets

widgets := controller.Parse(t, "admin/tickets", c.Env)

c.Env["Title"] = "Decred Voting Service - Tickets (Admin)"
Expand Down Expand Up @@ -1093,7 +1049,7 @@ func (controller *MainController) AdminTicketsPost(c web.C, r *http.Request) (st
switch action {
case "add":
actionVerb = "added"
ignoredLowFeeTickets, err := controller.StakepooldGetIgnoredLowFeeTickets()
ignoredLowFeeTickets, err := controller.StakepooldServers.GetIgnoredLowFeeTickets()
if err != nil {
session.AddFlash("GetIgnoredLowFeeTickets error: "+err.Error(),
"adminTicketsError")
Expand Down
47 changes: 18 additions & 29 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"os"
"strings"

"google.golang.org/grpc"

"github.com/gorilla/context"
"github.com/gorilla/csrf"

Expand Down Expand Up @@ -86,15 +84,13 @@ func runMain() int {
// Supported API versions are advertised in the API stats result
APIVersionsSupported := []int{1, 2}

grpcConnections := make([]*grpc.ClientConn, len(cfg.StakepooldHosts))
var stakepooldConnMan *stakepooldclient.StakepooldManager

if cfg.EnableStakepoold {
for i := range cfg.StakepooldHosts {
grpcConnections[i], err = stakepooldclient.ConnectStakepooldGRPC(cfg.StakepooldHosts, cfg.StakepooldCerts, i)
if err != nil {
log.Errorf("Failed to connect to stakepoold host %d: %v", i, err)
return 8
}
stakepooldConnMan, err = stakepooldclient.ConnectStakepooldGRPC(cfg.StakepooldHosts, cfg.StakepooldCerts)
if err != nil {
log.Errorf("Failed to connect to stakepoold host: %v", err)
return 8
}
}

Expand All @@ -108,7 +104,7 @@ func runMain() int {
controller, err := controllers.NewMainController(activeNetParams.Params,
cfg.AdminIPs, cfg.AdminUserIDs, cfg.APISecret, APIVersionsSupported,
cfg.BaseURL, cfg.ClosePool, cfg.ClosePoolMsg, cfg.EnableStakepoold,
cfg.ColdWalletExtPub, grpcConnections, cfg.PoolFees, cfg.PoolEmail,
cfg.ColdWalletExtPub, stakepooldConnMan, cfg.PoolFees, cfg.PoolEmail,
cfg.PoolLink, sender, cfg.WalletHosts, cfg.WalletCerts,
cfg.WalletUsers, cfg.WalletPasswords, cfg.MinServers, cfg.RealIPHeader,
cfg.VotingWalletExtPub, cfg.MaxVotedAge, cfg.Description, cfg.Designation)
Expand Down Expand Up @@ -143,25 +139,18 @@ func runMain() int {
log.Errorf("StakepooldUpdateTickets failed: %v", err)
return 9
}
for i := range grpcConnections {
addedLowFeeTickets, err := stakepooldclient.StakepooldGetAddedLowFeeTickets(grpcConnections[i])
if err != nil {
log.Errorf("GetAddedLowFeeTickets failed on host %d: %v", i, err)
return 9
}
ignoredLowFeeTickets, err := stakepooldclient.StakepooldGetIgnoredLowFeeTickets(grpcConnections[i])
if err != nil {
log.Errorf("GetIgnoredLowFeeTickets failed on host %d: %v", i, err)
return 9
}
liveTickets, err := stakepooldclient.StakepooldGetLiveTickets(grpcConnections[i])
if err != nil {
log.Errorf("GetLiveTickets failed on host %d: %v", i, err)
return 9
}
log.Infof("stakepoold %d reports ticket totals of AddedLowFee %v "+
"IgnoredLowFee %v Live %v", i, len(addedLowFeeTickets),
len(ignoredLowFeeTickets), len(liveTickets))
// Log the reported count of ignored/added/live tickets from each stakepoold
_, err = controller.StakepooldServers.GetIgnoredLowFeeTickets()
if err != nil {
return 9
}
_, err = controller.StakepooldServers.GetAddedLowFeeTickets()
if err != nil {
return 9
}
_, err = controller.StakepooldServers.GetLiveTickets()
if err != nil {
return 9
}
}

Expand Down
Loading

0 comments on commit 77e8205

Please sign in to comment.