Skip to content

Commit

Permalink
WebSockets #37 (#64)
Browse files Browse the repository at this point in the history
* WebSockets #37

* copy context to a goroutine

* circleci: newer google-cloud-sdk

Co-authored-by: jmank88 <[email protected]>
  • Loading branch information
r-gochain and jmank88 authored Aug 23, 2021
1 parent 604731d commit 011eed2
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 80 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ jobs:
- run:
name: install gcloud
command: |
wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-204.0.0-linux-x86_64.tar.gz --directory-prefix=tmp
tar -xvzf tmp/google-cloud-sdk-204.0.0-linux-x86_64.tar.gz -C tmp
wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-353.0.0-linux-x86_64.tar.gz --directory-prefix=tmp
tar -xvzf tmp/google-cloud-sdk-353.0.0-linux-x86_64.tar.gz -C tmp
./tmp/google-cloud-sdk/install.sh -q
- setup_remote_docker
- deploy:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/go-chi/chi/v5 v5.0.3
github.com/gochain/gochain/v3 v3.4.7
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/pelletier/go-toml v1.9.3
github.com/rs/cors v1.8.0
github.com/treeder/gcputils v0.1.1
Expand Down
137 changes: 64 additions & 73 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,34 @@ func parseRequests(r *http.Request) (string, []string, []ModifiedRequest, error)
if err != nil {
return "", nil, nil, fmt.Errorf("failed to read body: %v", err)
}
type rpcRequest struct {
ID json.RawMessage `json:"id"`
Method string `json:"method"`
Params []json.RawMessage `json:"params"`
methods, res, err = parseMessage(body, ip)
if err != nil {
return "", nil, nil, err
}
if isBatch(body) {
var arr []rpcRequest
err = json.Unmarshal(body, &arr)
if err != nil {
return "", nil, nil, fmt.Errorf("failed to parse JSON batch request: %v", err)
}
for _, t := range arr {
methods = append(methods, t.Method)
res = append(res, ModifiedRequest{
ID: t.ID,
Path: t.Method,
RemoteAddr: ip,
Params: t.Params,
})
}
} else {
var t rpcRequest
err = json.Unmarshal(body, &t)
if err != nil {
return "", nil, nil, fmt.Errorf("failed to parse JSON request: %v", err)
}
}
if len(res) == 0 {
methods = append(methods, r.URL.Path)
res = append(res, ModifiedRequest{
Path: r.URL.Path,
RemoteAddr: ip,
})
}
return ip, methods, res, nil
}

func parseMessage(body []byte, ip string) (methods []string, res []ModifiedRequest, err error) {
type rpcRequest struct {
ID json.RawMessage `json:"id"`
Method string `json:"method"`
Params []json.RawMessage `json:"params"`
}
if isBatch(body) {
var arr []rpcRequest
err := json.Unmarshal(body, &arr)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse JSON batch request: %v", err)
}
for _, t := range arr {
methods = append(methods, t.Method)
res = append(res, ModifiedRequest{
ID: t.ID,
Expand All @@ -105,15 +107,21 @@ func parseRequests(r *http.Request) (string, []string, []ModifiedRequest, error)
Params: t.Params,
})
}
}
if len(res) == 0 {
methods = append(methods, r.URL.Path)
} else {
var t rpcRequest
err := json.Unmarshal(body, &t)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse JSON request: %v", err)
}
methods = append(methods, t.Method)
res = append(res, ModifiedRequest{
Path: r.URL.Path,
ID: t.ID,
Path: t.Method,
RemoteAddr: ip,
Params: t.Params,
})
}
return ip, methods, res, nil
return methods, res, nil
}

const (
Expand All @@ -123,16 +131,18 @@ const (
jsonRPCInternal = -32603
)

type ErrResponse struct {
Version string `json:"jsonrpc"`
ID json.RawMessage `json:"id"`
Error struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}

func jsonRPCError(id json.RawMessage, jsonCode int, msg string) interface{} {
type errResponse struct {
Version string `json:"jsonrpc"`
ID json.RawMessage `json:"id"`
Error struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}
resp := errResponse{

resp := ErrResponse{
Version: "2.0",
ID: id,
}
Expand Down Expand Up @@ -187,8 +197,13 @@ func (t *myTransport) RoundTrip(req *http.Request) (*http.Response, error) {

ctx = gotils.With(ctx, "remoteIp", ip)
ctx = gotils.With(ctx, "methods", methods)
if blockResponse := t.block(ctx, parsedRequests); blockResponse != nil {
return blockResponse, nil
errorCode, resp := t.block(ctx, parsedRequests)
if resp != nil {
resp, err := jsonRPCResponse(errorCode, resp)
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct a response: %v", err)
}
return resp, nil
}

gotils.L(ctx).Info().Print("Forwarding request")
Expand All @@ -197,71 +212,47 @@ func (t *myTransport) RoundTrip(req *http.Request) (*http.Response, error) {
}

// block returns a response only if the request should be blocked, otherwise it returns nil if allowed.
func (t *myTransport) block(ctx context.Context, parsedRequests []ModifiedRequest) *http.Response {
func (t *myTransport) block(ctx context.Context, parsedRequests []ModifiedRequest) (int, interface{}) {
var union *blockRange
for _, parsedRequest := range parsedRequests {
ctx = gotils.With(ctx, "ip", parsedRequest.RemoteAddr)
if allowed, added := t.AllowVisitor(parsedRequest); !allowed {
gotils.L(ctx).Info().Print("Request blocked: Rate limited")
resp, err := jsonRPCResponse(http.StatusTooManyRequests, jsonRPCLimit(parsedRequest.ID))
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct rate-limit response: %v", err)
}
return resp
return http.StatusTooManyRequests, jsonRPCLimit(parsedRequest.ID)
} else if added {
gotils.L(ctx).Info().Printf("Added new visitor, ip: %v", parsedRequest.RemoteAddr)
}

if !t.MatchAnyRule(parsedRequest.Path) {
gotils.L(ctx).Info().Print("Request blocked: Method not allowed")
resp, err := jsonRPCResponse(http.StatusMethodNotAllowed, jsonRPCUnauthorized(parsedRequest.ID, parsedRequest.Path))
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct not-allowed response: %v", err)
}
return resp
return http.StatusMethodNotAllowed, jsonRPCUnauthorized(parsedRequest.ID, parsedRequest.Path)
}
if t.blockRangeLimit > 0 && parsedRequest.Path == "eth_getLogs" {
r, invalid, err := t.parseRange(ctx, parsedRequest)
if err != nil {
resp, err := jsonRPCResponse(http.StatusInternalServerError, jsonRPCError(parsedRequest.ID, jsonRPCInternal, err.Error()))
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct internal error response: %v", err)
}
return resp
return http.StatusInternalServerError, jsonRPCError(parsedRequest.ID, jsonRPCInternal, err.Error())
} else if invalid != nil {
gotils.L(ctx).Info().Printf("Request blocked: Invalid params: %v", invalid)
resp, err := jsonRPCResponse(http.StatusBadRequest, jsonRPCError(parsedRequest.ID, jsonRPCInvalidParams, invalid.Error()))
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct invalid params response: %v", err)
}
return resp
return http.StatusBadRequest, jsonRPCError(parsedRequest.ID, jsonRPCInvalidParams, invalid.Error())
}
if r != nil {
if l := r.len(); l > t.blockRangeLimit {
gotils.L(ctx).Info().Println("Request blocked: Exceeds block range limit, range:", l, "limit:", t.blockRangeLimit)
resp, err := jsonRPCResponse(http.StatusBadRequest, jsonRPCBlockRangeLimit(parsedRequest.ID, l, t.blockRangeLimit))
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct block range limit response: %v", err)
}
return resp
return http.StatusBadRequest, jsonRPCBlockRangeLimit(parsedRequest.ID, l, t.blockRangeLimit)
}
if union == nil {
union = r
} else {
union.extend(r)
if l := union.len(); l > t.blockRangeLimit {
gotils.L(ctx).Info().Println("Request blocked: Exceeds block range limit, range:", l, "limit:", t.blockRangeLimit)
resp, err := jsonRPCResponse(http.StatusBadRequest, jsonRPCBlockRangeLimit(parsedRequest.ID, l, t.blockRangeLimit))
if err != nil {
gotils.L(ctx).Error().Printf("Failed to construct block range limit response: %v", err)
}
return resp
return http.StatusBadRequest, jsonRPCBlockRangeLimit(parsedRequest.ID, l, t.blockRangeLimit)
}
}
}
}
}
return nil
return 0, nil
}

type blockRange struct{ start, end uint64 }
Expand Down
17 changes: 16 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var requestsPerMinuteLimit int
type ConfigData struct {
Port string `toml:",omitempty"`
URL string `toml:",omitempty"`
WSURL string `toml:",omitempty"`
Allow []string `toml:",omitempty"`
RPM int `toml:",omitempty"`
NoLimit []string `toml:",omitempty"`
Expand All @@ -37,6 +38,7 @@ func main() {
var configPath string
var port string
var redirecturl string
var redirectWSUrl string
var allowedPaths string
var noLimitIPs string
var blockRangeLimit uint64
Expand Down Expand Up @@ -64,6 +66,12 @@ func main() {
Usage: "redirect url",
Destination: &redirecturl,
},
&cli.StringFlag{
Name: "wsurl, w",
Value: "ws://127.0.0.1:8041",
Usage: "redirect websocket url",
Destination: &redirectWSUrl,
},
&cli.StringFlag{
Name: "allow, a",
Usage: "comma separated list of allowed paths",
Expand Down Expand Up @@ -111,6 +119,12 @@ func main() {
}
cfg.URL = redirecturl
}
if redirectWSUrl != "" {
if cfg.WSURL != "" {
return errors.New("ws url set in two places")
}
cfg.WSURL = redirectWSUrl
}
if requestsPerMinuteLimit != 0 {
if cfg.RPM != 0 {
return errors.New("rpm set in two places")
Expand Down Expand Up @@ -150,7 +164,7 @@ func (cfg *ConfigData) run(ctx context.Context) error {
sort.Strings(cfg.Allow)
sort.Strings(cfg.NoLimit)

gotils.L(ctx).Info().Println("Server starting, port:", cfg.Port, "redirectURL:", cfg.URL,
gotils.L(ctx).Info().Println("Server starting, port:", cfg.Port, "redirectURL:", cfg.URL, "redirectWSURL:", cfg.WSURL,
"rpmLimit:", cfg.RPM, "exempt:", cfg.NoLimit, "allowed:", cfg.Allow)

// Create proxy server.
Expand Down Expand Up @@ -189,5 +203,6 @@ func (cfg *ConfigData) run(ctx context.Context) error {
w.WriteHeader(http.StatusOK)
})
r.HandleFunc("/*", server.RPCProxy)
r.HandleFunc("/ws", server.WSProxy)
return http.ListenAndServe(":"+cfg.Port, r)
}
18 changes: 14 additions & 4 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

type Server struct {
target *url.URL
proxy *httputil.ReverseProxy
target *url.URL
proxy *httputil.ReverseProxy
wsProxy *WebsocketProxy
myTransport
homepage []byte
}
Expand All @@ -32,8 +33,11 @@ func (cfg *ConfigData) NewServer() (*Server, error) {
if err != nil {
return nil, err
}

s := &Server{target: url, proxy: httputil.NewSingleHostReverseProxy(url)}
wsurl, err := url.Parse(cfg.WSURL)
if err != nil {
return nil, err
}
s := &Server{target: url, proxy: httputil.NewSingleHostReverseProxy(url), wsProxy: NewProxy(wsurl)}
s.myTransport.blockRangeLimit = cfg.BlockRangeLimit
s.myTransport.url = cfg.URL
s.matcher, err = newMatcher(cfg.Allow)
Expand All @@ -46,6 +50,7 @@ func (cfg *ConfigData) NewServer() (*Server, error) {
s.noLimitIPs[ip] = struct{}{}
}
s.proxy.Transport = &s.myTransport
s.wsProxy.Transport = &s.myTransport

// Generate static home page.
id := json.RawMessage([]byte(`"ID"`))
Expand Down Expand Up @@ -88,6 +93,11 @@ func (p *Server) RPCProxy(w http.ResponseWriter, r *http.Request) {
p.proxy.ServeHTTP(w, r)
}

func (p *Server) WSProxy(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-rpc-proxy", "rpc-proxy")
p.wsProxy.ServeHTTP(w, r)
}

func (p *Server) Example(w http.ResponseWriter, r *http.Request) {
method := chi.URLParam(r, "method")
args := []string{
Expand Down
Loading

0 comments on commit 011eed2

Please sign in to comment.