Skip to content

Commit

Permalink
Merge pull request #811 from textileio/jsign/chgmi
Browse files Browse the repository at this point in the history
OpenTelemetry metrics & Miner Index fix for document max-size limit & new deal-watcher
  • Loading branch information
jsign authored Mar 29, 2021
2 parents 4778f52 + 3eb5f34 commit 813687f
Show file tree
Hide file tree
Showing 42 changed files with 1,208 additions and 666 deletions.
54 changes: 28 additions & 26 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,34 @@ func defaultServerConfig(t *testing.T) server.Config {

grpcMaddr := util.MustParseAddr(grpcHostAddress)
conf := server.Config{
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
FFSRetrievalNextEventTimeout: time.Hour,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
FFSRetrievalNextEventTimeout: time.Hour,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
IndexMinersOnChainMaxParallel: 1,
IndexMinersOnChainFrequency: time.Minute,
}
return conf
}
Expand Down
2 changes: 1 addition & 1 deletion api/server/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/textileio/powergate/v2/ffs/manager"
"github.com/textileio/powergate/v2/ffs/scheduler"
askIndex "github.com/textileio/powergate/v2/index/ask/runner"
minerIndex "github.com/textileio/powergate/v2/index/miner/module"
minerIndex "github.com/textileio/powergate/v2/index/miner/lotusidx"
"github.com/textileio/powergate/v2/wallet"
)

Expand Down
65 changes: 46 additions & 19 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
badger "github.com/ipfs/go-ds-badger2"
httpapi "github.com/ipfs/go-ipfs-http-client"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
measure "github.com/textileio/go-ds-measure"
mongods "github.com/textileio/go-ds-mongo"
adminPb "github.com/textileio/powergate/v2/api/gen/powergate/admin/v1"
userPb "github.com/textileio/powergate/v2/api/gen/powergate/user/v1"
Expand All @@ -44,7 +46,7 @@ import (
"github.com/textileio/powergate/v2/gateway"
ask "github.com/textileio/powergate/v2/index/ask/runner"
faultsModule "github.com/textileio/powergate/v2/index/faults/module"
minerModule "github.com/textileio/powergate/v2/index/miner/module"
minerIndex "github.com/textileio/powergate/v2/index/miner/lotusidx"
"github.com/textileio/powergate/v2/iplocation/maxmind"
"github.com/textileio/powergate/v2/lotus"
"github.com/textileio/powergate/v2/migration"
Expand Down Expand Up @@ -74,6 +76,7 @@ var (
2: migration.V2StorageInfoDealIDs,
3: migration.V3StorageJobsIndexMigration,
4: migration.V4RecordsMigration,
5: migration.V5DeleteOldMinerIndex,
}
)

Expand All @@ -83,7 +86,7 @@ type Server struct {

mm *maxmind.MaxMind
ai *ask.Runner
mi *minerModule.Index
mi *minerIndex.Index
fi *faultsModule.Index
dm *dealsModule.Module
wm *lotusWallet.Module
Expand Down Expand Up @@ -146,7 +149,9 @@ type Config struct {
AskIndexRefreshInterval time.Duration
AskIndexRefreshOnStart bool

IndexMinersRefreshOnStart bool
IndexMinersRefreshOnStart bool
IndexMinersOnChainMaxParallel int
IndexMinersOnChainFrequency time.Duration

DisableIndices bool

Expand Down Expand Up @@ -214,32 +219,47 @@ func NewServer(conf Config) (*Server, error) {
if err != nil {
return nil, fmt.Errorf("opening maxmind database: %s", err)
}
askConf := ask.Config{
askIdxConf := ask.Config{
Disable: conf.DisableIndices,
QueryAskTimeout: conf.AskIndexQueryAskTimeout,
MaxParallel: conf.AskindexMaxParallel,
RefreshInterval: conf.AskIndexRefreshInterval,
RefreshOnStart: conf.Devnet || conf.AskIndexRefreshOnStart,
}
ai, err := ask.New(txndstr.Wrap(ds, "index/ask"), clientBuilder, askConf)
log.Info("Starting ask index...")
ai, err := ask.New(txndstr.Wrap(ds, "index/ask"), clientBuilder, askIdxConf)
if err != nil {
return nil, fmt.Errorf("creating ask index: %s", err)
}
mi, err := minerModule.New(txndstr.Wrap(ds, "index/miner"), clientBuilder, fchost, mm, conf.IndexMinersRefreshOnStart, conf.DisableIndices)

log.Info("Starting miner index...")
minerIdxConf := minerIndex.Config{
RefreshOnStart: conf.IndexMinersRefreshOnStart,
Disable: conf.DisableIndices,
OnChainMaxParallel: conf.IndexMinersOnChainMaxParallel,
OnChainFrequency: conf.IndexMinersOnChainFrequency,
}
mi, err := minerIndex.New(kt.Wrap(ds, kt.PrefixTransform{Prefix: datastore.NewKey("index/miner")}), clientBuilder, fchost, mm, minerIdxConf)
if err != nil {
return nil, fmt.Errorf("creating miner index: %s", err)
}

log.Info("Starting faults index...")
si, err := faultsModule.New(txndstr.Wrap(ds, "index/faults"), clientBuilder, conf.DisableIndices)
if err != nil {
return nil, fmt.Errorf("creating faults index: %s", err)
}
if conf.Devnet {
conf.DealWatchPollDuration = time.Second
}

log.Info("Starting deals module...")
dm, err := dealsModule.New(txndstr.Wrap(ds, "deals"), clientBuilder, conf.DealWatchPollDuration, conf.FFSDealFinalityTimeout, deals.WithImportPath(filepath.Join(conf.RepoPath, "imports")))
if err != nil {
return nil, fmt.Errorf("creating deal module: %s", err)
}

log.Info("Starting wallet module...")
wm, err := lotusWallet.New(clientBuilder, masterAddr, conf.WalletInitialFunds, conf.AutocreateMasterAddr, networkName)
if err != nil {
return nil, fmt.Errorf("creating wallet module: %s", err)
Expand Down Expand Up @@ -268,6 +288,7 @@ func NewServer(conf Config) (*Server, error) {
return nil, fmt.Errorf("creating coreipfs: %s", err)
}

log.Info("Starting FFS scheduler...")
var sr2rf func() (int, error)
if ms, ok := ms.(*sr2.MinerSelector); ok {
sr2rf = ms.GetReplicationFactor
Expand Down Expand Up @@ -521,6 +542,9 @@ func (s *Server) Close() {
if err := s.l.Close(); err != nil {
log.Errorf("closing joblogger: %s", err)
}
if err := s.dm.Close(); err != nil {
log.Errorf("closing deal module: %s", err)
}
if err := s.rm.Close(); err != nil {
log.Errorf("closing reputation module: %s", err)
}
Expand Down Expand Up @@ -549,6 +573,9 @@ func (s *Server) Close() {
}

func createDatastore(conf Config, longTimeout bool) (datastore.TxnDatastore, error) {
var ds datastore.TxnDatastore
var err error

if conf.MongoURI != "" {
log.Info("Opening Mongo database...")
mongoCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand All @@ -560,24 +587,24 @@ func createDatastore(conf Config, longTimeout bool) (datastore.TxnDatastore, err
if longTimeout {
opts = []mongods.Option{mongods.WithOpTimeout(time.Hour), mongods.WithTxnTimeout(time.Hour)}
}
ds, err := mongods.New(mongoCtx, conf.MongoURI, conf.MongoDB, opts...)
ds, err = mongods.New(mongoCtx, conf.MongoURI, conf.MongoDB, opts...)
if err != nil {
return nil, fmt.Errorf("opening mongo datastore: %s", err)
}
return ds, nil
} else {
log.Info("Opening badger database...")
path := filepath.Join(conf.RepoPath, datastoreFolderName)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating repo folder: %s", err)
}
opts := &badger.DefaultOptions
ds, err = badger.NewDatastore(path, opts)
if err != nil {
return nil, fmt.Errorf("opening badger datastore: %s", err)
}
}

log.Info("Opening badger database...")
path := filepath.Join(conf.RepoPath, datastoreFolderName)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating repo folder: %s", err)
}
opts := &badger.DefaultOptions
ds, err := badger.NewDatastore(path, opts)
if err != nil {
return nil, fmt.Errorf("opening badger datastore: %s", err)
}
return ds, nil
return measure.New("powergate.datastore", ds), nil
}

func getMinerSelector(conf Config, rm *reputation.Module, ai *ask.Runner, cb lotus.ClientBuilder) (ffs.MinerSelector, error) {
Expand Down
71 changes: 41 additions & 30 deletions cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ import (
"syscall"
"time"

"contrib.go.opencensus.io/exporter/prometheus"
_ "net/http/pprof"

logging "github.com/ipfs/go-log/v2"
homedir "github.com/mitchellh/go-homedir"
ma "github.com/multiformats/go-multiaddr"
"github.com/spf13/pflag"
"github.com/spf13/viper"
metricsOpenTelemetry "github.com/textileio/go-metrics-opentelemetry"
"github.com/textileio/powergate/v2/api/server"
"github.com/textileio/powergate/v2/buildinfo"
"github.com/textileio/powergate/v2/util"
"go.opencensus.io/plugin/runmetrics"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
)

var (
Expand Down Expand Up @@ -51,10 +57,10 @@ func main() {
log.Infof("starting powd:\n%s", buildinfo.Summary())

// Configuring Prometheus exporter.
closeInstr, err := setupInstrumentation()
if err != nil {
if err := setupInstrumentation(); err != nil {
log.Fatalf("starting instrumentation: %s", err)
}

confProtected := conf
if confProtected.MongoURI != "" {
confProtected.MongoURI = "<hidden>"
Expand All @@ -78,7 +84,6 @@ func main() {
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
<-ch
log.Info("Closing...")
closeInstr()
powd.Close()
if conf.Devnet {
if err := os.RemoveAll(conf.RepoPath); err != nil {
Expand Down Expand Up @@ -140,6 +145,8 @@ func configFromFlags() (server.Config, error) {
askIndexRefreshOnStart := config.GetBool("askindexrefreshonstart")
askIndexMaxParallel := config.GetInt("askindexmaxparallel")
indexMinersRefreshOnStart := config.GetBool("indexminersrefreshonstart")
indexMinersOnChainMaxParallel := config.GetInt("indexminersonchainmaxparallel")
indexMinersOnChainFrequency := config.GetDuration("indexminersonchainfrequency")
disableIndices := config.GetBool("disableindices")
disableNonCompliantAPIs := config.GetBool("disablenoncompliantapis")

Expand Down Expand Up @@ -186,45 +193,45 @@ func configFromFlags() (server.Config, error) {
AskIndexRefreshOnStart: askIndexRefreshOnStart,
AskindexMaxParallel: askIndexMaxParallel,

IndexMinersRefreshOnStart: indexMinersRefreshOnStart,
IndexMinersRefreshOnStart: indexMinersRefreshOnStart,
IndexMinersOnChainMaxParallel: indexMinersOnChainMaxParallel,
IndexMinersOnChainFrequency: indexMinersOnChainFrequency,

DisableIndices: disableIndices,

DisableNonCompliantAPIs: disableNonCompliantAPIs,
}, nil
}

func setupInstrumentation() (func(), error) {
err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true,
EnableMemory: true,
func setupInstrumentation() error {
exporter, err := prometheus.InstallNewPipeline(prometheus.Config{
DefaultHistogramBoundaries: []float64{1e-4, 1e-3, 1e-2, 1e-1, 1},
})
if err != nil {
return nil, fmt.Errorf("enabling runtime metrics: %s", err)
log.Panicf("failed to initialize prometheus exporter %v", err)
}
pe, err := prometheus.NewExporter(prometheus.Options{
Namespace: "textilefc",
})
if err != nil {
return nil, fmt.Errorf("creating the prometheus stats exporter: %v", err)
}
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
srv := &http.Server{Addr: ":8888", Handler: mux}
http.HandleFunc("/metrics", exporter.ServeHTTP)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("running prometheus scrape endpoint: %v", err)
}
_ = http.ListenAndServe(":8888", nil)
}()
closeFunc := func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Errorf("shutting down prometheus server: %s", err)
}

if err := metricsOpenTelemetry.Inject(); err != nil {
return fmt.Errorf("injecting datastore open-telemetry: %s", err)
}

if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second)); err != nil {
return fmt.Errorf("starting Go runtime metrics: %s", err)
}

return closeFunc, nil
meter := global.Meter("powergate")
attrBuildDate := attribute.Key("builddate").String(buildinfo.BuildDate)
attrGitSummary := attribute.Key("gitsummary").String(buildinfo.GitSummary)
attrGitBranch := attribute.Key("gitbranch").String(buildinfo.GitBranch)
attrGitCommit := attribute.Key("gitcommit").String(buildinfo.GitCommit)
metricInfo := metric.Must(meter).NewInt64Counter("powergate.info")
metricInfo.Add(context.Background(), 1, attrBuildDate, attrGitSummary, attrGitBranch, attrGitCommit)

return nil
}

func setupLogging(repoPath string) error {
Expand All @@ -243,13 +250,15 @@ func setupLogging(repoPath string) error {
"chainstore",
"fchost",
"maxmind",
"lotusidx-store",

// Lotus client
"lotus-client",

// Deals Module
"deals",
"deals-records",
"deals-watcher",

// Wallet Module
"lotus-wallet",
Expand Down Expand Up @@ -403,6 +412,8 @@ func setupFlags() error {
pflag.String("askindexmaxparallel", "3", "Max parallel query ask to execute while updating index.")

pflag.Bool("indexminersrefreshonstart", false, "If true it will refresh the miner's on start.")
pflag.Int64("indexminersonchainmaxparallel", 20, "Max parallelization for building on-chain sub-index")
pflag.Duration("indexminersonchainfrequency", time.Hour*6, "Frequency of updating on-chain sub-index")

pflag.Bool("disableindices", false, "Disable all indices updates, useful to help Lotus syncing process.")
pflag.Bool("disablenoncompliantapis", false, "Disable APIs that may not easily comply with US law.")
Expand Down
Loading

0 comments on commit 813687f

Please sign in to comment.