diff --git a/core/eth/state.go b/core/eth/state.go index 4b667c621d..ba1a140ac5 100644 --- a/core/eth/state.go +++ b/core/eth/state.go @@ -10,6 +10,12 @@ type ChainState struct { Tx core.Transactor } +func NewChainState(tx core.Transactor) *ChainState { + return &ChainState{ + Tx: tx, + } +} + var _ core.ChainState = (*ChainState)(nil) func (cs *ChainState) GetOperatorStateByOperator(blockNumber uint, operator core.OperatorId) (*core.OperatorState, error) { diff --git a/disperser/batcher.go b/disperser/batcher.go index cae7a5e078..9b4a9e7811 100644 --- a/disperser/batcher.go +++ b/disperser/batcher.go @@ -30,7 +30,13 @@ type QuorumInfo struct { QuorumThreshold uint8 } +type BatcherConfig struct { + PullInterval time.Duration +} + type Batcher struct { + BatcherConfig + Queue BlobStore Client Dispatcher Confirmer BatchConfirmer @@ -41,11 +47,10 @@ type Batcher struct { Aggregator core.SignatureAggregator Logger common.Logger - - pullInterval time.Duration } func NewBatcher( + config BatcherConfig, queue BlobStore, client Dispatcher, confirmer BatchConfirmer, @@ -54,10 +59,10 @@ func NewBatcher( encoder core.Encoder, aggregator core.SignatureAggregator, logger common.Logger, - pullInterval time.Duration, ) *Batcher { return &Batcher{ + BatcherConfig: config, Queue: queue, Client: client, Confirmer: confirmer, @@ -66,7 +71,6 @@ func NewBatcher( Encoder: encoder, Aggregator: aggregator, Logger: logger, - pullInterval: pullInterval, } } @@ -78,7 +82,7 @@ func (b *Batcher) Start(ctx context.Context) { log := b.Logger - ticker := time.NewTicker(b.pullInterval) + ticker := time.NewTicker(b.PullInterval) defer ticker.Stop() for { diff --git a/disperser/batcher_test.go b/disperser/batcher_test.go index e2581786f8..713f8dee94 100644 --- a/disperser/batcher_test.go +++ b/disperser/batcher_test.go @@ -91,8 +91,12 @@ var _ = Describe("Integration", func() { key2, err := queue.AddBlob(&blob) Expect(err).To(BeNil()) + config := disperser.BatcherConfig{ + PullInterval: time.Second, + } + // Make the batcher - batcher := disperser.NewBatcher(queue, dispatcher, confirmer, cst, asgn, enc, agg, logger, time.Second) + batcher := disperser.NewBatcher(config, queue, dispatcher, confirmer, cst, asgn, enc, agg, logger) // Start the batcher batcher.Start(ctx) diff --git a/disperser/cmd/basic/config.go b/disperser/cmd/basic/config.go new file mode 100644 index 0000000000..86ca4c888e --- /dev/null +++ b/disperser/cmd/basic/config.go @@ -0,0 +1,36 @@ +package main + +import ( + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/common/logging" + "github.com/Layr-Labs/eigenda/core/encoding" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/grpc" + "github.com/urfave/cli" +) + +type Config struct { + BatcherConfig disperser.BatcherConfig + EthClientConfig geth.EthClientConfig + EncoderConfig encoding.EncoderConfig + LoggerConfig logging.Config + ServerConfig grpc.ServerConfig + + Address string +} + +func NewConfig(ctx *cli.Context) Config { + config := Config{ + EthClientConfig: geth.ReadEthClientConfig(ctx), + EncoderConfig: encoding.ReadCLIConfig(ctx), + LoggerConfig: logging.ReadCLIConfig(ctx), + ServerConfig: grpc.ServerConfig{ + GrpcPort: ctx.GlobalString(grpcPortFlag.Name), + }, + BatcherConfig: disperser.BatcherConfig{ + PullInterval: ctx.GlobalDuration(pullIntervalFlag.Name), + }, + Address: ctx.GlobalString(addressFlag.Name), + } + return config +} diff --git a/disperser/cmd/basic/disperser.go b/disperser/cmd/basic/disperser.go new file mode 100644 index 0000000000..a6845d08a9 --- /dev/null +++ b/disperser/cmd/basic/disperser.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/encoding" + "github.com/Layr-Labs/eigenda/core/eth" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/grpc" + "github.com/Layr-Labs/eigenda/disperser/inmem" +) + +type Disperser struct { + Batcher *disperser.Batcher + Server *grpc.DispersalServer +} + +func NewDisperser(config Config, logger common.Logger) (*Disperser, error) { + + queue := inmem.NewBlobStore() + + client := &grpc.Dispatcher{} + agg := &core.StdSignatureAggregator{} + asgn := &core.StdAssignmentCoordinator{} + + enc, err := encoding.NewEncoder(config.EncoderConfig) + if err != nil { + return nil, err + } + + tx, err := eth.NewTransactor(logger, config.EthClientConfig, config.Address) + if err != nil { + return nil, err + } + + cst := eth.NewChainState(tx) + + batcher := disperser.NewBatcher(config.BatcherConfig, queue, client, nil, cst, asgn, enc, agg, logger) + + server := grpc.NewDispersalServer(config.ServerConfig, queue, logger) + + return &Disperser{ + Batcher: batcher, + Server: server, + }, nil +} + +func (d *Disperser) Start(ctx context.Context) error { + + d.Batcher.Start(ctx) + + d.Server.Start(ctx) + + return nil +} diff --git a/disperser/cmd/basic/flags.go b/disperser/cmd/basic/flags.go new file mode 100644 index 0000000000..b0a1bd0a26 --- /dev/null +++ b/disperser/cmd/basic/flags.go @@ -0,0 +1,51 @@ +package main + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/common/logging" + "github.com/Layr-Labs/eigenda/core/encoding" + "github.com/urfave/cli" +) + +const envVarPrefix = "DISPERSER" + +var ( + /* Required Flags */ + pullIntervalFlag = cli.DurationFlag{ + Name: "pull-interval", + Usage: "Interval at which to pull from the queue", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "PULL_INTERVAL"), + } + grpcPortFlag = cli.StringFlag{ + Name: "grpc-port", + Usage: "Port at which disperser listens for grpc calls", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "GRPC_PORT"), + } + addressFlag = cli.StringFlag{ + Name: "address", + Usage: "Address of the disperser", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ADDRESS"), + } +) + +var requiredFlags = []cli.Flag{ + pullIntervalFlag, + grpcPortFlag, + addressFlag, +} + +var optionalFlags = []cli.Flag{} + +// Flags contains the list of configuration options available to the binary. +var Flags []cli.Flag + +func init() { + Flags = append(requiredFlags, optionalFlags...) + Flags = append(Flags, encoding.CLIFlags(envVarPrefix)...) + Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...) + Flags = append(Flags, logging.CLIFlags(envVarPrefix)...) +} diff --git a/disperser/cmd/basic/main.go b/disperser/cmd/basic/main.go new file mode 100644 index 0000000000..36c05f85c9 --- /dev/null +++ b/disperser/cmd/basic/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/node/flags" + "github.com/urfave/cli" +) + +var ( + // Version is the version of the binary. + Version string + GitCommit string + GitDate string +) + +func main() { + + app := cli.NewApp() + app.Flags = flags.Flags + app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate) + app.Name = "dl-disperser" + app.Usage = "DataLayr Disperser" + app.Description = "Service for encoding blobs and distributing coded chunks to nodes" + + app.Action = RunDisperser + err := app.Run(os.Args) + if err != nil { + log.Fatalln("Application failed.", "Message:", err) + } + + select {} +} + +func RunDisperser(ctx *cli.Context) error { + + config := NewConfig(ctx) + + var logger common.Logger = nil + + dis, err := NewDisperser(config, logger) + if err != nil { + return err + } + + err = dis.Start(context.Background()) + if err != nil { + return err + } + + return nil + +} diff --git a/disperser/grpc/client.go b/disperser/grpc/dispatcher.go similarity index 93% rename from disperser/grpc/client.go rename to disperser/grpc/dispatcher.go index a8406dbb27..0ddca2efb6 100644 --- a/disperser/grpc/client.go +++ b/disperser/grpc/dispatcher.go @@ -13,10 +13,21 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +type DispatcherConfig struct { + Timeout time.Duration +} + type Dispatcher struct { + *DispatcherConfig + logger common.Logger +} - timeout time.Duration +func NewDispatcher(cfg *DispatcherConfig, logger common.Logger) *Dispatcher { + return &Dispatcher{ + DispatcherConfig: cfg, + logger: logger, + } } var _ disperser.Dispatcher = (*Dispatcher)(nil) @@ -76,7 +87,7 @@ func (c *Dispatcher) sendChunks(ctx context.Context, chunkBatch core.ChunkBatch, defer conn.Close() gc := node.NewNodeClient(conn) - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() request, err := getStoreChunksRequest(chunkBatch, header) diff --git a/disperser/grpc/server.go b/disperser/grpc/server.go index d958cf7bae..574f294625 100644 --- a/disperser/grpc/server.go +++ b/disperser/grpc/server.go @@ -22,7 +22,7 @@ type ServerConfig struct { type DispersalServer struct { pb.UnimplementedDisperserServer - config *ServerConfig + config ServerConfig blobQueue disperser.BlobStore @@ -32,7 +32,7 @@ type DispersalServer struct { // NewServer creates a new Server struct with the provided parameters. // // Note: The Server's chunks store will be created at config.DbPath+"/chunk". -func NewDispersalServer(config *ServerConfig, queue disperser.BlobStore, logger common.Logger) *DispersalServer { +func NewDispersalServer(config ServerConfig, queue disperser.BlobStore, logger common.Logger) *DispersalServer { return &DispersalServer{ config: config, blobQueue: queue, @@ -81,7 +81,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR } -func (s *DispersalServer) Start() { +func (s *DispersalServer) Start(ctx context.Context) { s.logger.Trace("Entering Start function...") defer s.logger.Trace("Exiting Start function...")