Skip to content

Commit

Permalink
rpc: reformat method signatures and use a context (tendermint#8377)
Browse files Browse the repository at this point in the history
I was digging around over here, and thought it'd be good to
cleanup/standardize the line formating on a few of these methods. Also
found a few cases where we could use contexts better so did a little
bit of cleanup there too!
  • Loading branch information
tychoish authored Apr 19, 2022
1 parent e4991fd commit 29e5fbc
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 135 deletions.
3 changes: 1 addition & 2 deletions rpc/client/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,5 @@ type MissedItemsError struct {

// Error satisfies the error interface.
func (e *MissedItemsError) Error() string {
return fmt.Sprintf("missed events matching %q between %q and %q",
e.Query, e.NewestSeen, e.OldestPresent)
return fmt.Sprintf("missed events matching %q between %q and %q", e.Query, e.NewestSeen, e.OldestPresent)
}
83 changes: 14 additions & 69 deletions rpc/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,11 @@ func (c *baseRPCClient) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo
return result, nil
}

func (c *baseRPCClient) ABCIQuery(
ctx context.Context,
path string,
data bytes.HexBytes,
) (*coretypes.ResultABCIQuery, error) {
func (c *baseRPCClient) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*coretypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions)
}

func (c *baseRPCClient) ABCIQueryWithOptions(
ctx context.Context,
path string,
data bytes.HexBytes,
opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
func (c *baseRPCClient) ABCIQueryWithOptions(ctx context.Context, path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
result := new(coretypes.ResultABCIQuery)
if err := c.caller.Call(ctx, "abci_query", abciQueryArgs{
Path: path,
Expand All @@ -232,48 +224,31 @@ func (c *baseRPCClient) ABCIQueryWithOptions(
return result, nil
}

func (c *baseRPCClient) BroadcastTxCommit(
ctx context.Context,
tx types.Tx,
) (*coretypes.ResultBroadcastTxCommit, error) {
func (c *baseRPCClient) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTxCommit, error) {
result := new(coretypes.ResultBroadcastTxCommit)
if err := c.caller.Call(ctx, "broadcast_tx_commit", txArgs{Tx: tx}, result); err != nil {
return nil, err
}
return result, nil
}

func (c *baseRPCClient) BroadcastTxAsync(
ctx context.Context,
tx types.Tx,
) (*coretypes.ResultBroadcastTx, error) {
func (c *baseRPCClient) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
return c.broadcastTX(ctx, "broadcast_tx_async", tx)
}

func (c *baseRPCClient) BroadcastTxSync(
ctx context.Context,
tx types.Tx,
) (*coretypes.ResultBroadcastTx, error) {
func (c *baseRPCClient) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
return c.broadcastTX(ctx, "broadcast_tx_sync", tx)
}

func (c *baseRPCClient) broadcastTX(
ctx context.Context,
route string,
tx types.Tx,
) (*coretypes.ResultBroadcastTx, error) {
func (c *baseRPCClient) broadcastTX(ctx context.Context, route string, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
result := new(coretypes.ResultBroadcastTx)
if err := c.caller.Call(ctx, route, txArgs{Tx: tx}, result); err != nil {
return nil, err
}
return result, nil
}

func (c *baseRPCClient) UnconfirmedTxs(
ctx context.Context,
page *int,
perPage *int,
) (*coretypes.ResultUnconfirmedTxs, error) {
func (c *baseRPCClient) UnconfirmedTxs(ctx context.Context, page *int, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) {
result := new(coretypes.ResultUnconfirmedTxs)

if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Page: page, PerPage: perPage}, result); err != nil {
Expand Down Expand Up @@ -329,10 +304,7 @@ func (c *baseRPCClient) ConsensusState(ctx context.Context) (*coretypes.ResultCo
return result, nil
}

func (c *baseRPCClient) ConsensusParams(
ctx context.Context,
height *int64,
) (*coretypes.ResultConsensusParams, error) {
func (c *baseRPCClient) ConsensusParams(ctx context.Context, height *int64) (*coretypes.ResultConsensusParams, error) {
result := new(coretypes.ResultConsensusParams)
if err := c.caller.Call(ctx, "consensus_params", heightArgs{Height: height}, result); err != nil {
return nil, err
Expand All @@ -356,11 +328,7 @@ func (c *baseRPCClient) Health(ctx context.Context) (*coretypes.ResultHealth, er
return result, nil
}

func (c *baseRPCClient) BlockchainInfo(
ctx context.Context,
minHeight,
maxHeight int64,
) (*coretypes.ResultBlockchainInfo, error) {
func (c *baseRPCClient) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) {
result := new(coretypes.ResultBlockchainInfo)
if err := c.caller.Call(ctx, "blockchain", blockchainInfoArgs{
MinHeight: minHeight,
Expand Down Expand Up @@ -403,10 +371,7 @@ func (c *baseRPCClient) BlockByHash(ctx context.Context, hash bytes.HexBytes) (*
return result, nil
}

func (c *baseRPCClient) BlockResults(
ctx context.Context,
height *int64,
) (*coretypes.ResultBlockResults, error) {
func (c *baseRPCClient) BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error) {
result := new(coretypes.ResultBlockResults)
if err := c.caller.Call(ctx, "block_results", heightArgs{Height: height}, result); err != nil {
return nil, err
Expand Down Expand Up @@ -446,14 +411,7 @@ func (c *baseRPCClient) Tx(ctx context.Context, hash bytes.HexBytes, prove bool)
return result, nil
}

func (c *baseRPCClient) TxSearch(
ctx context.Context,
query string,
prove bool,
page,
perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error) {
func (c *baseRPCClient) TxSearch(ctx context.Context, query string, prove bool, page, perPage *int, orderBy string) (*coretypes.ResultTxSearch, error) {
result := new(coretypes.ResultTxSearch)
if err := c.caller.Call(ctx, "tx_search", searchArgs{
Query: query,
Expand All @@ -468,12 +426,7 @@ func (c *baseRPCClient) TxSearch(
return result, nil
}

func (c *baseRPCClient) BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*coretypes.ResultBlockSearch, error) {
func (c *baseRPCClient) BlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) {
result := new(coretypes.ResultBlockSearch)
if err := c.caller.Call(ctx, "block_search", searchArgs{
Query: query,
Expand All @@ -487,12 +440,7 @@ func (c *baseRPCClient) BlockSearch(
return result, nil
}

func (c *baseRPCClient) Validators(
ctx context.Context,
height *int64,
page,
perPage *int,
) (*coretypes.ResultValidators, error) {
func (c *baseRPCClient) Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error) {
result := new(coretypes.ResultValidators)
if err := c.caller.Call(ctx, "validators", validatorArgs{
Height: height,
Expand All @@ -504,10 +452,7 @@ func (c *baseRPCClient) Validators(
return result, nil
}

func (c *baseRPCClient) BroadcastEvidence(
ctx context.Context,
ev types.Evidence,
) (*coretypes.ResultBroadcastEvidence, error) {
func (c *baseRPCClient) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*coretypes.ResultBroadcastEvidence, error) {
result := new(coretypes.ResultBroadcastEvidence)
if err := c.caller.Call(ctx, "broadcast_evidence", evidenceArgs{
Evidence: coretypes.Evidence{Value: ev},
Expand Down
4 changes: 2 additions & 2 deletions rpc/client/http/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
}

w.mtx.Lock()
defer w.mtx.Unlock()
info, ok := w.subscriptions[query]
if ok {
if info.id != "" {
delete(w.subscriptions, info.id)
}
delete(w.subscriptions, info.query)
}
w.mtx.Unlock()

return nil
}
Expand All @@ -129,8 +129,8 @@ func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
}

w.mtx.Lock()
defer w.mtx.Unlock()
w.subscriptions = make(map[string]*wsSubscription)
w.mtx.Unlock()

return nil
}
Expand Down
52 changes: 20 additions & 32 deletions rpc/client/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (c *Local) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes)
return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions)
}

func (c *Local) ABCIQueryWithOptions(
ctx context.Context,
path string,
data bytes.HexBytes,
opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
func (c *Local) ABCIQueryWithOptions(ctx context.Context, path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) {
return c.env.ABCIQuery(ctx, path, data, opts.Height, opts.Prove)
}

Expand Down Expand Up @@ -189,35 +185,19 @@ func (c *Local) Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coret
return c.env.Tx(ctx, hash, prove)
}

func (c *Local) TxSearch(
ctx context.Context,
queryString string,
prove bool,
page,
perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error) {
func (c *Local) TxSearch(ctx context.Context, queryString string, prove bool, page, perPage *int, orderBy string) (*coretypes.ResultTxSearch, error) {
return c.env.TxSearch(ctx, queryString, prove, page, perPage, orderBy)
}

func (c *Local) BlockSearch(
ctx context.Context,
queryString string,
page, perPage *int,
orderBy string,
) (*coretypes.ResultBlockSearch, error) {
func (c *Local) BlockSearch(ctx context.Context, queryString string, page, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) {
return c.env.BlockSearch(ctx, queryString, page, perPage, orderBy)
}

func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*coretypes.ResultBroadcastEvidence, error) {
return c.env.BroadcastEvidence(ctx, coretypes.Evidence{Value: ev})
}

func (c *Local) Subscribe(
ctx context.Context,
subscriber,
queryString string,
capacity ...int) (out <-chan coretypes.ResultEvent, err error) {
func (c *Local) Subscribe(ctx context.Context, subscriber, queryString string, capacity ...int) (<-chan coretypes.ResultEvent, error) {
q, err := query.New(queryString)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
Expand Down Expand Up @@ -251,12 +231,7 @@ func (c *Local) Subscribe(
return outc, nil
}

func (c *Local) eventsRoutine(
ctx context.Context,
sub eventbus.Subscription,
subArgs pubsub.SubscribeArgs,
outc chan<- coretypes.ResultEvent,
) {
func (c *Local) eventsRoutine(ctx context.Context, sub eventbus.Subscription, subArgs pubsub.SubscribeArgs, outc chan<- coretypes.ResultEvent) {
qstr := subArgs.Query.String()
for {
msg, err := sub.Next(ctx)
Expand All @@ -271,17 +246,24 @@ func (c *Local) eventsRoutine(
}
continue
}
outc <- coretypes.ResultEvent{
select {
case outc <- coretypes.ResultEvent{
SubscriptionID: msg.SubscriptionID(),
Query: qstr,
Data: msg.Data(),
Events: msg.Events(),
}:
case <-ctx.Done():
return
}
}
}

// Try to resubscribe with exponential backoff.
func (c *Local) resubscribe(ctx context.Context, subArgs pubsub.SubscribeArgs) eventbus.Subscription {
timer := time.NewTimer(0)
defer timer.Stop()

attempts := 0
for {
if !c.IsRunning() {
Expand All @@ -294,7 +276,13 @@ func (c *Local) resubscribe(ctx context.Context, subArgs pubsub.SubscribeArgs) e
}

attempts++
time.Sleep((10 << uint(attempts)) * time.Millisecond) // 10ms -> 20ms -> 40ms
timer.Reset((10 << uint(attempts)) * time.Millisecond) // 10ms -> 20ms -> 40ms
select {
case <-timer.C:
continue
case <-ctx.Done():
return nil
}
}
}

Expand Down
27 changes: 8 additions & 19 deletions rpc/jsonrpc/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ func DefaultConfig() *Config {

// Serve creates a http.Server and calls Serve with the given listener. It
// wraps handler to recover panics and limit the request body size.
func Serve(
ctx context.Context,
listener net.Listener,
handler http.Handler,
logger log.Logger,
config *Config,
) error {
func Serve(ctx context.Context, listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr()))
h := recoverAndLogHandler(MaxBytesHandler(handler, config.MaxBodyBytes), logger)
s := &http.Server{
Expand Down Expand Up @@ -83,19 +77,14 @@ func Serve(
// Serve creates a http.Server and calls ServeTLS with the given listener,
// certFile and keyFile. It wraps handler to recover panics and limit the
// request body size.
func ServeTLS(
ctx context.Context,
listener net.Listener,
handler http.Handler,
certFile, keyFile string,
logger log.Logger,
config *Config,
) error {
logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile))
h := recoverAndLogHandler(MaxBytesHandler(handler, config.MaxBodyBytes), logger)
func ServeTLS(ctx context.Context, listener net.Listener, handler http.Handler, certFile, keyFile string, logger log.Logger, config *Config) error {
logger.Info("Starting RPC HTTPS server",
"listenterAddr", listener.Addr(),
"certFile", certFile,
"keyFile", keyFile)

s := &http.Server{
Handler: h,
Handler: recoverAndLogHandler(MaxBytesHandler(handler, config.MaxBodyBytes), logger),
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MaxHeaderBytes: config.MaxHeaderBytes,
Expand Down
13 changes: 2 additions & 11 deletions rpc/jsonrpc/server/ws_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ type WebsocketManager struct {

// NewWebsocketManager returns a new WebsocketManager that passes a map of
// functions, connection options and logger to new WS connections.
func NewWebsocketManager(
logger log.Logger,
funcMap map[string]*RPCFunc,
wsConnOptions ...func(*wsConnection),
) *WebsocketManager {
func NewWebsocketManager(logger log.Logger, funcMap map[string]*RPCFunc, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
return &WebsocketManager{
funcMap: funcMap,
Upgrader: websocket.Upgrader{
Expand Down Expand Up @@ -137,12 +133,7 @@ type wsConnection struct {
// description of how to configure ping period and pong wait time. NOTE: if the
// write buffer is full, pongs may be dropped, which may cause clients to
// disconnect. see https://github.com/gorilla/websocket/issues/97
func newWSConnection(
baseConn *websocket.Conn,
funcMap map[string]*RPCFunc,
logger log.Logger,
options ...func(*wsConnection),
) *wsConnection {
func newWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, logger log.Logger, options ...func(*wsConnection)) *wsConnection {
wsc := &wsConnection{
Logger: logger,
remoteAddr: baseConn.RemoteAddr().String(),
Expand Down

0 comments on commit 29e5fbc

Please sign in to comment.