Skip to content

Commit

Permalink
feat: add repo
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Dec 7, 2023
1 parent 4dd17ab commit d0bc86d
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 65 deletions.
24 changes: 20 additions & 4 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,37 @@ package cli

import (
"bytes"
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"

"github.com/filecoin-project/go-jsonrpc"
local_api "github.com/ipfs-force-community/sophon-co/cli/api"
"github.com/ipfs-force-community/sophon-co/config"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
)

func NewLocalRPCClient(ctx context.Context, addr string, opts ...jsonrpc.Option) (local_api.LocalAPI, jsonrpc.ClientCloser, error) {
func NewLocalRPCClient(cctx *cli.Context, opts ...jsonrpc.Option) (local_api.LocalAPI, jsonrpc.ClientCloser, error) {
repoPath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return nil, nil, err
}
cfg, err := config.ReadConfig(filepath.Join(repoPath, config.ConfigFile))
if err != nil {
return nil, nil, err
}

addr := cfg.API.ListenAddress
if cctx.IsSet("listen") {
addr = cctx.String("listen")
}
port := strings.Split(addr, ":")[1]
endpoint := fmt.Sprintf("http://127.0.0.1:%s/rpc/admin/v0", port)

token, err := os.ReadFile("./token")
token, err := os.ReadFile(filepath.Join(repoPath, config.TokenFile))
token = bytes.TrimSpace(token)
if err != nil {
return nil, nil, err
Expand All @@ -26,7 +42,7 @@ func NewLocalRPCClient(ctx context.Context, addr string, opts ...jsonrpc.Option)
headers.Add("Authorization", "Bearer "+string(token))

var res local_api.LocalAPIStruct
closer, err := jsonrpc.NewMergeClient(ctx, endpoint, "Filecoin",
closer, err := jsonrpc.NewMergeClient(cctx.Context, endpoint, "Filecoin",
[]interface{}{
&res,
},
Expand Down
12 changes: 3 additions & 9 deletions cli/weight.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ var weightListCmd = &cli.Command{
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
addr := cctx.String("listen")
client, closer, err := NewLocalRPCClient(ctx, addr)
client, closer, err := NewLocalRPCClient(cctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -67,17 +66,12 @@ var weightSetCmd = &cli.Command{
}

ctx := cctx.Context
listen := cctx.String("listen")
client, closer, err := NewLocalRPCClient(ctx, listen)
client, closer, err := NewLocalRPCClient(cctx)
if err != nil {
return err
}
defer closer()

err = client.SetWeight(ctx, node, weight)
if err != nil {
return err
}
return nil
return client.SetWeight(ctx, node, weight)
},
}
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func main() {
Usage: "listen address for the service",
Value: "0.0.0.0:1234",
},
&cli.StringFlag{
Name: "repo",
Usage: "sophon-co repo path",
Value: "~/.sophon-co",
EnvVars: []string{"SOPHON_CO_PATH"},
},
},

Commands: local,
Expand Down
22 changes: 11 additions & 11 deletions cmd/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,37 @@ import (
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
vapi "github.com/filecoin-project/venus/venus-shared/api"
"github.com/filecoin-project/venus/venus-shared/api/permission"
"github.com/ipfs-force-community/metrics"
"github.com/ipfs-force-community/metrics/ratelimit"
"github.com/ipfs-force-community/sophon-auth/core"
"github.com/ipfs-force-community/sophon-auth/jwtclient"
local_api "github.com/ipfs-force-community/sophon-co/cli/api"
"github.com/ipfs-force-community/sophon-co/config"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/plugin/ochttp"
)

func serveRPC(ctx context.Context, authApi vapi.APIInfo, rateLimitRedis, listen string, mCnf *metrics.TraceConfig, jwt jwtclient.IJwtAuthClient, full api.FullNode, localApi local_api.LocalAPI, stop dix.StopFunc, maxRequestSize int64) error {
func serveRPC(ctx context.Context, cfg *config.Config, listen string, jwt jwtclient.IJwtAuthClient, full api.FullNode, localApi local_api.LocalAPI, stop dix.StopFunc, maxRequestSize int64) error {
serverOptions := []jsonrpc.ServerOption{}
if maxRequestSize > 0 {
serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(maxRequestSize))
}

var remoteJwtCli *jwtclient.AuthClient
if len(authApi.Addr) > 0 {
if len(authApi.Token) == 0 {
if len(cfg.Auth.URL) > 0 {
if len(cfg.Auth.Token) == 0 {
return fmt.Errorf("auth token is need when auth api is set")
}
remoteJwtCli, _ = jwtclient.NewAuthClient(authApi.Addr, string(authApi.Token))
remoteJwtCli, _ = jwtclient.NewAuthClient(cfg.Auth.URL, string(cfg.Auth.Token))
}

pma := new(api.FullNodeStruct)
permission.PermissionProxy(full, pma)
if len(rateLimitRedis) > 0 && remoteJwtCli != nil {
log.Infof("use rate limit %s", rateLimitRedis)
if len(cfg.RateLimit.Redis) > 0 && remoteJwtCli != nil {
log.Infof("use rate limit %s", cfg.RateLimit.Redis)
limiter, err := ratelimit.NewRateLimitHandler(
rateLimitRedis,
cfg.RateLimit.Redis,
nil, &core.ValueFromCtx{},
jwtclient.WarpLimitFinder(remoteJwtCli),
logging.Logger("rate-limit"))
Expand Down Expand Up @@ -87,10 +87,10 @@ func serveRPC(ctx context.Context, authApi vapi.APIInfo, rateLimitRedis, listen

allHandler := (http.Handler)(mux)

if reporter, err := metrics.SetupJaegerTracing(mCnf.ServerName, mCnf); err != nil {
log.Fatalf("register %s JaegerRepoter to %s failed:%s", mCnf.ServerName, mCnf.JaegerEndpoint, err)
if reporter, err := metrics.SetupJaegerTracing(cfg.Trace.ServerName, cfg.Trace); err != nil {
return fmt.Errorf("register %s JaegerReporter to %s failed: %s", cfg.Trace.ServerName, cfg.Trace.JaegerEndpoint, err)
} else if reporter != nil {
log.Infof("register jaeger-tracing exporter to %s, with node-name:%s", mCnf.JaegerEndpoint, mCnf.ServerName)
log.Infof("register jaeger-tracing exporter to %s, with node-name: %s", cfg.Trace.JaegerEndpoint, cfg.Trace.ServerName)
defer metrics.ShutdownJaeger(ctx, reporter) //nolint:errcheck
allHandler = &ochttp.Handler{Handler: allHandler}
}
Expand Down
127 changes: 88 additions & 39 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package main

import (
"context"
"net"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/ipfs-force-community/metrics"
"github.com/mitchellh/go-homedir"

"github.com/urfave/cli/v2"

"github.com/filecoin-project/lotus/api/v1api"
vapi "github.com/filecoin-project/venus/venus-shared/api"
"github.com/ipfs-force-community/metrics"
local_api "github.com/ipfs-force-community/sophon-co/cli/api"
"github.com/ipfs-force-community/sophon-co/config"

"github.com/ipfs-force-community/sophon-auth/jwtclient"
"github.com/ipfs-force-community/sophon-co/dep"
Expand Down Expand Up @@ -56,25 +58,50 @@ var runCmd = &cli.Command{
},
&cli.StringFlag{
Name: "trace-node-name",
Value: "venus-node-co",
},
&cli.StringFlag{
Name: "metrics-endpoint",
Value: metrics.DefaultMetricsConfig().Exporter.Prometheus.EndPoint,
Value: "sophon-co",
},
},
Action: func(cctx *cli.Context) error {
appCtx, appCancel := context.WithCancel(cctx.Context)
defer appCancel()

repoPath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
has, err := hasRepo(repoPath)
if err != nil {
return err
}
cfg := config.DefaultConfig()
if has {
cfg, err = config.ReadConfig(filepath.Join(repoPath, config.ConfigFile))
if err != nil {
return err
}
}

if err := parseFlag(cctx, cfg); err != nil {
return err
}

if !has {
if err := os.MkdirAll(repoPath, 0o755); err != nil {
return err
}
if err := config.WriteConfig(filepath.Join(repoPath, config.ConfigFile), cfg); err != nil {
return err
}
}

var full v1api.FullNode
var localApi local_api.LocalAPI

localJwt, token, err := jwtclient.NewLocalAuthClient()
if err != nil {
return err
}
err = os.WriteFile("./token", token, 0o666)
err = os.WriteFile(filepath.Join(repoPath, config.TokenFile), token, 0o666)
if err != nil {
return err
}
Expand All @@ -84,7 +111,7 @@ var runCmd = &cli.Command{
dep.MetricsCtxOption(appCtx, cliName),

dep.APIVersionOption(cctx.String("version")),
service.ParseNodeInfoList(cctx.StringSlice("node"), cctx.String("version")),
service.ParseNodeInfoList(config.NodeList(cfg.Nodes), cctx.String("version")),
service.FullNode(&full),
service.LocalAPI(&localApi),
)
Expand All @@ -94,42 +121,15 @@ var runCmd = &cli.Command{

defer stop(context.Background()) // nolint:errcheck

jCnf := &metrics.TraceConfig{}
proxy, sampler, serverName := strings.TrimSpace(cctx.String("jaeger-proxy")),
cctx.Float64("trace-sampler"),
strings.TrimSpace(cctx.String("trace-node-name"))

if jCnf.JaegerTracingEnabled = len(proxy) != 0; jCnf.JaegerTracingEnabled {
jCnf.ProbabilitySampler, jCnf.JaegerEndpoint, jCnf.ServerName = sampler, proxy, serverName
}

mCfg := metrics.DefaultMetricsConfig()
mCfg.Enabled = true
mCfg.Exporter.Prometheus.Namespace = "chain_co"
mCfg.Exporter.Graphite.Namespace = "chain_co"
if cctx.IsSet("metrics-endpoint") {
mCfg.Exporter.Prometheus.EndPoint = cctx.String("metrics-endpoint")
addr, err := net.ResolveTCPAddr("tcp", cctx.String("metrics-endpoint"))
if err != nil {
return err
}
mCfg.Exporter.Graphite.Host = addr.IP.String()
mCfg.Exporter.Graphite.Port = addr.Port
}

err = metrics.SetupMetrics(appCtx, mCfg)
err = metrics.SetupMetrics(appCtx, cfg.Metrics)
if err != nil {
return err
}

authApi := vapi.ParseApiInfo(cctx.String("auth"))

return serveRPC(
appCtx,
authApi,
cctx.String("rate-limit-redis"),
cfg,
cctx.String("listen"),
jCnf,
localJwt,
full,
localApi,
Expand All @@ -142,3 +142,52 @@ var runCmd = &cli.Command{
)
},
}

func hasRepo(repoPath string) (bool, error) {
_, err := os.Stat(repoPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}

func parseFlag(cctx *cli.Context, cfg *config.Config) error {
if cctx.IsSet("listen") {
cfg.API.ListenAddress = cctx.String("listen")
}
if cctx.IsSet("auth") {
urlToken := strings.SplitN(cctx.String("auth"), ":", 2)
if len(urlToken) != 2 {
return fmt.Errorf("invalid auth: %v", cctx.String("auth"))
}
cfg.Auth.Token = urlToken[0]
cfg.Auth.URL = urlToken[1]
}
if cctx.IsSet("rate-limit-redis") {
cfg.RateLimit.Redis = cctx.String("rate-limit-redis")
}
if cctx.IsSet("jaeger-proxy") {
cfg.Trace.JaegerEndpoint = cctx.String("jaeger-proxy")
cfg.Trace.JaegerTracingEnabled = true
}
if cctx.IsSet("trace-sampler") {
cfg.Trace.ProbabilitySampler = cctx.Float64("trace-sampler")
}
if cctx.IsSet("trace-node-name") {
cfg.Trace.ServerName = cctx.String("trace-node-name")
}
if cctx.IsSet("node") {
var nodes []config.NodeConfig
for _, node := range cctx.StringSlice("node") {
nodes = append(nodes, config.NodeConfig{
TokenURL: node,
})
}
cfg.Nodes = nodes
}

return nil
}
Loading

0 comments on commit d0bc86d

Please sign in to comment.