diff --git a/disperser/cmd/encoder/config.go b/disperser/cmd/encoder/config.go index 69c0ff6400..4003adcf98 100644 --- a/disperser/cmd/encoder/config.go +++ b/disperser/cmd/encoder/config.go @@ -58,6 +58,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name), EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name), PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name), + PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name), + EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name), }, MetricsConfig: encoder.MetrisConfig{ HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), diff --git a/disperser/cmd/encoder/flags/flags.go b/disperser/cmd/encoder/flags/flags.go index d14b45e04d..8c9399a399 100644 --- a/disperser/cmd/encoder/flags/flags.go +++ b/disperser/cmd/encoder/flags/flags.go @@ -73,6 +73,19 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "PREVENT_REENCODING"), } + PprofHttpPort = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "pprof-http-port"), + Usage: "the http port which the pprof server is listening", + Required: false, + Value: "6060", + EnvVar: common.PrefixEnvVar(envVarPrefix, "PPROF_HTTP_PORT"), + } + EnablePprof = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-pprof"), + Usage: "start prrof server", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_PPROF"), + } ) var requiredFlags = []cli.Flag{ @@ -88,6 +101,8 @@ var optionalFlags = []cli.Flag{ EncoderVersionFlag, S3BucketNameFlag, PreventReencodingFlag, + PprofHttpPort, + EnablePprof, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/encoder/config.go b/disperser/encoder/config.go index 8fcba36cd1..b543efe7b2 100644 --- a/disperser/encoder/config.go +++ b/disperser/encoder/config.go @@ -10,4 +10,6 @@ type ServerConfig struct { RequestPoolSize int EnableGnarkChunkEncoding bool PreventReencoding bool + PprofHttpPort string + EnablePprof bool } diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 4eb0c39f61..f7f06e682c 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -10,12 +10,13 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/healthcheck" + commonpprof "github.com/Layr-Labs/eigenda/common/pprof" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" - grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -23,12 +24,12 @@ import ( type EncoderServer struct { pb.UnimplementedEncoderServer - config ServerConfig - logger logging.Logger - prover encoding.Prover - metrics *Metrics + config ServerConfig + logger logging.Logger + prover encoding.Prover + metrics *Metrics grpcMetrics *grpcprom.ServerMetrics - close func() + close func() runningRequests chan struct{} requestPool chan blobRequest @@ -46,10 +47,10 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin metrics.SetQueueCapacity(config.RequestPoolSize) return &EncoderServer{ - config: config, - logger: logger.With("component", "EncoderServer"), - prover: prover, - metrics: metrics, + config: config, + logger: logger.With("component", "EncoderServer"), + prover: prover, + metrics: metrics, grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), @@ -59,6 +60,12 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin } func (s *EncoderServer) Start() error { + pprofProfiler := commonpprof.NewPprofProfiler(s.config.PprofHttpPort, s.logger) + if s.config.EnablePprof { + go pprofProfiler.Start() + s.logger.Info("Enabled pprof for encoder server", "port", s.config.PprofHttpPort) + } + // Serve grpc requests addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.config.GrpcPort) listener, err := net.Listen("tcp", addr) @@ -104,8 +111,6 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques blobSize := len(req.GetData()) sizeBucket := common.BlobSizeBucket(blobSize) - - select { case s.requestPool <- blobRequest{blobSizeByte: blobSize}: s.queueLock.Lock() diff --git a/node/config.go b/node/config.go index 7d73bdcb20..c5244e55bd 100644 --- a/node/config.go +++ b/node/config.go @@ -91,6 +91,9 @@ type Config struct { EnableV2 bool OnchainStateRefreshInterval time.Duration + + PprofHttpPort string + EnablePprof bool } // NewConfig parses the Config from the provided flags or environment variables and @@ -237,5 +240,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { BLSRemoteSignerEnabled: blsRemoteSignerEnabled, EnableV2: ctx.GlobalBool(flags.EnableV2Flag.Name), OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshIntervalFlag.Name), + PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name), + EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name), }, nil } diff --git a/node/flags/flags.go b/node/flags/flags.go index b16e2709e9..40c1237a7d 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -313,6 +313,19 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "BLS_SIGNER_CERT_FILE"), } + PprofHttpPort = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "pprof-http-port"), + Usage: "the http port which the pprof server is listening", + Required: false, + Value: "6060", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "PPROF_HTTP_PORT"), + } + EnablePprof = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-pprof"), + Usage: "start prrof server", + Required: false, + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_PPROF"), + } ) var requiredFlags = []cli.Flag{ @@ -361,6 +374,8 @@ var optionalFlags = []cli.Flag{ BLSSignerCertFileFlag, EnableV2Flag, OnchainStateRefreshIntervalFlag, + PprofHttpPort, + EnablePprof, } func init() { diff --git a/node/node.go b/node/node.go index 550228e9c9..07055e55cb 100644 --- a/node/node.go +++ b/node/node.go @@ -18,6 +18,7 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" + "github.com/Layr-Labs/eigenda/common/pprof" "github.com/Layr-Labs/eigenda/common/pubip" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" @@ -280,6 +281,11 @@ func NewNode( // Start starts the Node. If the node is not registered, register it on chain, otherwise just // update its socket on chain. func (n *Node) Start(ctx context.Context) error { + pprofProfiler := pprof.NewPprofProfiler(n.Config.PprofHttpPort, n.Logger) + if n.Config.EnablePprof { + go pprofProfiler.Start() + n.Logger.Info("Enabled pprof for Node", "port", n.Config.PprofHttpPort) + } if n.Config.EnableMetrics { n.Metrics.Start() n.Logger.Info("Enabled metrics", "socket", n.Metrics.socketAddr)