Skip to content

Commit

Permalink
feat: rest proxy (#13)
Browse files Browse the repository at this point in the history
- move the seed updating logic to its own thing
- add rest nodes to the seed updater
- reused the previous proxy implementation for both rpc and rest calls 
- fixed status page to show both kinds of nodes
- fixed a possible nil access when response is nil (introduced in #12)

closes #9
  • Loading branch information
caarlos0 authored Aug 17, 2024
1 parent 6e56229 commit 6933a3c
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 128 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.pem

dist/
akash
44 changes: 23 additions & 21 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,35 @@ <h1>Akash Proxy</h1>
<table>
<thead>
<tr>
<th>Name</th>
<th>URL</th>
<th>Average response time</th>
<th>Server</th>
<th>Request Count</th>
<th>Avg response time</th>
<th>Error Rate</th>
<th>Status</th>
<th>Kind</th>
</tr>
</thead>
<!-- prettier-ignore -->
<tbody>
{{ range .}}
<tr>
<th>{{.Name}}</th>
<th><a href="{{.URL}}">{{.URL}}</a></th>
<th>{{.Avg}}</th>
<th>{{.Requests}}</th>
<th>{{.ErrorRate}}%</th>
<th>
<!-- prettier-ignore -->
{{ if not .Initialized}}
initializing
{{ else if .Degraded }}
degraded
{{else}}
OK
{{end}}
</th>
</tr>
{{ range $key, $value := . }}
{{ range $value }}
<tr>
<th><a href="{{ .URL }}">{{ .Name }}</a></th>
<th>{{ .Requests }}</th>
<th>{{ .Avg }}</th>
<th>{{ .ErrorRate }}%</th>
<th>
{{ if not .Initialized }}
initializing
{{ else if .Degraded }}
degraded
{{ else }}
OK
{{ end }}
</th>
<th>{{ $key }}</th>
</tr>
{{ end }}
{{ end }}
</tbody>
</table>
Expand Down
78 changes: 47 additions & 31 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,38 @@ import (
"net/http"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/akash-network/rpc-proxy/internal/config"
"github.com/akash-network/rpc-proxy/internal/seed"
)

func New(cfg config.Config) *Proxy {
type ProxyKind uint8

const (
RPC ProxyKind = iota
Rest ProxyKind = iota
)

func New(
kind ProxyKind,
ch chan seed.Seed,
cfg config.Config,
) *Proxy {
return &Proxy{
cfg: cfg,
cfg: cfg,
ch: ch,
kind: kind,
}
}

type Proxy struct {
cfg config.Config
kind ProxyKind
init sync.Once
ch chan seed.Seed

round int
mu sync.Mutex
Expand Down Expand Up @@ -61,10 +76,16 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

switch p.kind {
case RPC:
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/rpc")
case Rest:
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/rest")
}

if srv := p.next(); srv != nil {
srv.ServeHTTP(w, r)
return

}
slog.Error("no servers available")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -90,17 +111,30 @@ func (p *Proxy) next() *Server {
return p.next()
}

func (p *Proxy) update(rpcs []seed.RPC) error {
func (p *Proxy) update(seed seed.Seed) {
var err error
switch p.kind {
case RPC:
err = p.doUpdate(seed.APIs.RPC)
case Rest:
err = p.doUpdate(seed.APIs.Rest)
}
if err != nil {
slog.Error("could not update seed", "err", err)
}
}

func (p *Proxy) doUpdate(providers []seed.Provider) error {
p.mu.Lock()
defer p.mu.Unlock()

// add new servers
for _, rpc := range rpcs {
idx := slices.IndexFunc(p.servers, func(srv *Server) bool { return srv.name == rpc.Provider })
for _, provider := range providers {
idx := slices.IndexFunc(p.servers, func(srv *Server) bool { return srv.name == provider.Provider })
if idx == -1 {
srv, err := newServer(
rpc.Provider,
rpc.Address,
provider.Provider,
provider.Address,
p.cfg,
)
if err != nil {
Expand All @@ -112,8 +146,8 @@ func (p *Proxy) update(rpcs []seed.RPC) error {

// remove deleted servers
p.servers = slices.DeleteFunc(p.servers, func(srv *Server) bool {
for _, rpc := range rpcs {
if rpc.Provider == srv.name {
for _, provider := range providers {
if provider.Provider == srv.name {
return false
}
}
Expand All @@ -129,33 +163,15 @@ func (p *Proxy) update(rpcs []seed.RPC) error {
func (p *Proxy) Start(ctx context.Context) {
p.init.Do(func() {
go func() {
t := time.NewTicker(p.cfg.SeedRefreshInterval)
defer t.Stop()
for {
select {
case <-t.C:
p.fetchAndUpdate()
case seed := <-p.ch:
p.update(seed)
case <-ctx.Done():
p.shuttingDown.Store(true)
return
}
}
}()
p.fetchAndUpdate()
})
}

func (p *Proxy) fetchAndUpdate() {
result, err := seed.Fetch(p.cfg.SeedURL)
if err != nil {
slog.Error("could not get initial seed list", "err", err)
return
}
if result.ChainID != p.cfg.ChainID {
slog.Error("chain ID is different than expected", "got", result.ChainID, "expected", p.cfg.ChainID)
return
}
if err := p.update(result.Apis.RPC); err != nil {
slog.Error("could not update servers", "err", err)
}
}
107 changes: 55 additions & 52 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand All @@ -17,53 +16,33 @@ import (
)

func TestProxy(t *testing.T) {
const chainID = "unittest"
for name, kind := range map[string]ProxyKind{
"rpc": RPC,
"rest": Rest,
} {
t.Run(name, func(t *testing.T) {
testProxy(t, kind)
})
}
}

func testProxy(tb testing.TB, kind ProxyKind) {
srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.WriteString(w, "srv1 replied")
}))
t.Cleanup(srv1.Close)
tb.Cleanup(srv1.Close)
srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Millisecond * 500)
_, _ = io.WriteString(w, "srv2 replied")
}))
t.Cleanup(srv2.Close)
tb.Cleanup(srv2.Close)
srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)
}))
t.Cleanup(srv2.Close)

seed := seed.Seed{
ChainID: chainID,
Apis: seed.Apis{
RPC: []seed.RPC{
{
Address: srv1.URL,
Provider: "srv1",
},
{
Address: srv2.URL,
Provider: "srv2",
},
{
Address: srv3.URL,
Provider: "srv3",
},
},
},
}

t.Logf("%+v", seed)

seedSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bts, _ := json.Marshal(seed)
_, _ = w.Write(bts)
}))
t.Cleanup(seedSrv.Close)
tb.Cleanup(srv2.Close)

proxy := New(config.Config{
SeedURL: seedSrv.URL,
SeedRefreshInterval: 500 * time.Millisecond,
ChainID: chainID,
ch := make(chan seed.Seed, 1)
proxy := New(kind, ch, config.Config{
HealthyThreshold: 10 * time.Millisecond,
ProxyRequestTimeout: time.Second,
UnhealthyServerRecoverChancePct: 1,
Expand All @@ -72,19 +51,43 @@ func TestProxy(t *testing.T) {
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
tb.Cleanup(cancel)
proxy.Start(ctx)

require.Len(t, proxy.servers, 3)
serverList := []seed.Provider{
{
Address: srv1.URL,
Provider: "srv1",
},
{
Address: srv2.URL,
Provider: "srv2",
},
{
Address: srv3.URL,
Provider: "srv3",
},
}

ch <- seed.Seed{
APIs: seed.Apis{
Rest: serverList,
RPC: serverList,
},
}

require.Eventually(tb, func() bool { return proxy.initialized.Load() }, time.Second, time.Millisecond)

require.Len(tb, proxy.servers, 3)

proxySrv := httptest.NewServer(proxy)
t.Cleanup(proxySrv.Close)
tb.Cleanup(proxySrv.Close)

var wg errgroup.Group
wg.SetLimit(20)
for i := 0; i < 100; i++ {
wg.Go(func() error {
t.Log("go")
tb.Log("go")
req, err := http.NewRequest(http.MethodGet, proxySrv.URL, nil)
if err != nil {
return err
Expand All @@ -102,13 +105,13 @@ func TestProxy(t *testing.T) {
return nil
})
}
require.NoError(t, wg.Wait())
require.NoError(tb, wg.Wait())

// stop the proxy
cancel()

stats := proxy.Stats()
require.Len(t, stats, 3)
require.Len(tb, stats, 3)

var srv1Stats ServerStat
var srv2Stats ServerStat
Expand All @@ -124,13 +127,13 @@ func TestProxy(t *testing.T) {
srv3Stats = st
}
}
require.Zero(t, srv1Stats.ErrorRate)
require.Zero(t, srv2Stats.ErrorRate)
require.Equal(t, float64(100), srv3Stats.ErrorRate)
require.Greater(t, srv1Stats.Requests, srv2Stats.Requests)
require.Greater(t, srv2Stats.Avg, srv1Stats.Avg)
require.False(t, srv1Stats.Degraded)
require.True(t, srv2Stats.Degraded)
require.True(t, srv1Stats.Initialized)
require.True(t, srv2Stats.Initialized)
require.Zero(tb, srv1Stats.ErrorRate)
require.Zero(tb, srv2Stats.ErrorRate)
require.Equal(tb, float64(100), srv3Stats.ErrorRate)
require.Greater(tb, srv1Stats.Requests, srv2Stats.Requests)
require.Greater(tb, srv2Stats.Avg, srv1Stats.Avg)
require.False(tb, srv1Stats.Degraded)
require.True(tb, srv2Stats.Degraded)
require.True(tb, srv1Stats.Initialized)
require.True(tb, srv2Stats.Initialized)
}
Loading

0 comments on commit 6933a3c

Please sign in to comment.