Skip to content

Commit

Permalink
Add basic disperser (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Jun 29, 2023
1 parent efae3d0 commit 117670a
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 11 deletions.
6 changes: 6 additions & 0 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ type ChainState struct {
Tx core.Transactor
}

func NewChainState(tx core.Transactor) *ChainState {
return &ChainState{
Tx: tx,
}
}

var _ core.ChainState = (*ChainState)(nil)

func (cs *ChainState) GetOperatorStateByOperator(blockNumber uint, operator core.OperatorId) (*core.OperatorState, error) {
Expand Down
14 changes: 9 additions & 5 deletions disperser/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ type QuorumInfo struct {
QuorumThreshold uint8
}

type BatcherConfig struct {
PullInterval time.Duration
}

type Batcher struct {
BatcherConfig

Queue BlobStore
Client Dispatcher
Confirmer BatchConfirmer
Expand All @@ -41,11 +47,10 @@ type Batcher struct {
Aggregator core.SignatureAggregator

Logger common.Logger

pullInterval time.Duration
}

func NewBatcher(
config BatcherConfig,
queue BlobStore,
client Dispatcher,
confirmer BatchConfirmer,
Expand All @@ -54,10 +59,10 @@ func NewBatcher(
encoder core.Encoder,
aggregator core.SignatureAggregator,
logger common.Logger,
pullInterval time.Duration,
) *Batcher {

return &Batcher{
BatcherConfig: config,
Queue: queue,
Client: client,
Confirmer: confirmer,
Expand All @@ -66,7 +71,6 @@ func NewBatcher(
Encoder: encoder,
Aggregator: aggregator,
Logger: logger,
pullInterval: pullInterval,
}
}

Expand All @@ -78,7 +82,7 @@ func (b *Batcher) Start(ctx context.Context) {

log := b.Logger

ticker := time.NewTicker(b.pullInterval)
ticker := time.NewTicker(b.PullInterval)
defer ticker.Stop()

for {
Expand Down
6 changes: 5 additions & 1 deletion disperser/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ var _ = Describe("Integration", func() {
key2, err := queue.AddBlob(&blob)
Expect(err).To(BeNil())

config := disperser.BatcherConfig{
PullInterval: time.Second,
}

// Make the batcher
batcher := disperser.NewBatcher(queue, dispatcher, confirmer, cst, asgn, enc, agg, logger, time.Second)
batcher := disperser.NewBatcher(config, queue, dispatcher, confirmer, cst, asgn, enc, agg, logger)

// Start the batcher
batcher.Start(ctx)
Expand Down
36 changes: 36 additions & 0 deletions disperser/cmd/basic/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/common/logging"
"github.com/Layr-Labs/eigenda/core/encoding"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/grpc"
"github.com/urfave/cli"
)

type Config struct {
BatcherConfig disperser.BatcherConfig
EthClientConfig geth.EthClientConfig
EncoderConfig encoding.EncoderConfig
LoggerConfig logging.Config
ServerConfig grpc.ServerConfig

Address string
}

func NewConfig(ctx *cli.Context) Config {
config := Config{
EthClientConfig: geth.ReadEthClientConfig(ctx),
EncoderConfig: encoding.ReadCLIConfig(ctx),
LoggerConfig: logging.ReadCLIConfig(ctx),
ServerConfig: grpc.ServerConfig{
GrpcPort: ctx.GlobalString(grpcPortFlag.Name),
},
BatcherConfig: disperser.BatcherConfig{
PullInterval: ctx.GlobalDuration(pullIntervalFlag.Name),
},
Address: ctx.GlobalString(addressFlag.Name),
}
return config
}
57 changes: 57 additions & 0 deletions disperser/cmd/basic/disperser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/encoding"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/grpc"
"github.com/Layr-Labs/eigenda/disperser/inmem"
)

type Disperser struct {
Batcher *disperser.Batcher
Server *grpc.DispersalServer
}

func NewDisperser(config Config, logger common.Logger) (*Disperser, error) {

queue := inmem.NewBlobStore()

client := &grpc.Dispatcher{}
agg := &core.StdSignatureAggregator{}
asgn := &core.StdAssignmentCoordinator{}

enc, err := encoding.NewEncoder(config.EncoderConfig)
if err != nil {
return nil, err
}

tx, err := eth.NewTransactor(logger, config.EthClientConfig, config.Address)
if err != nil {
return nil, err
}

cst := eth.NewChainState(tx)

batcher := disperser.NewBatcher(config.BatcherConfig, queue, client, nil, cst, asgn, enc, agg, logger)

server := grpc.NewDispersalServer(config.ServerConfig, queue, logger)

return &Disperser{
Batcher: batcher,
Server: server,
}, nil
}

func (d *Disperser) Start(ctx context.Context) error {

d.Batcher.Start(ctx)

d.Server.Start(ctx)

return nil
}
51 changes: 51 additions & 0 deletions disperser/cmd/basic/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/common/logging"
"github.com/Layr-Labs/eigenda/core/encoding"
"github.com/urfave/cli"
)

const envVarPrefix = "DISPERSER"

var (
/* Required Flags */
pullIntervalFlag = cli.DurationFlag{
Name: "pull-interval",
Usage: "Interval at which to pull from the queue",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "PULL_INTERVAL"),
}
grpcPortFlag = cli.StringFlag{
Name: "grpc-port",
Usage: "Port at which disperser listens for grpc calls",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GRPC_PORT"),
}
addressFlag = cli.StringFlag{
Name: "address",
Usage: "Address of the disperser",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ADDRESS"),
}
)

var requiredFlags = []cli.Flag{
pullIntervalFlag,
grpcPortFlag,
addressFlag,
}

var optionalFlags = []cli.Flag{}

// Flags contains the list of configuration options available to the binary.
var Flags []cli.Flag

func init() {
Flags = append(requiredFlags, optionalFlags...)
Flags = append(Flags, encoding.CLIFlags(envVarPrefix)...)
Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...)
Flags = append(Flags, logging.CLIFlags(envVarPrefix)...)
}
57 changes: 57 additions & 0 deletions disperser/cmd/basic/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/node/flags"
"github.com/urfave/cli"
)

var (
// Version is the version of the binary.
Version string
GitCommit string
GitDate string
)

func main() {

app := cli.NewApp()
app.Flags = flags.Flags
app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate)
app.Name = "dl-disperser"
app.Usage = "DataLayr Disperser"
app.Description = "Service for encoding blobs and distributing coded chunks to nodes"

app.Action = RunDisperser
err := app.Run(os.Args)
if err != nil {
log.Fatalln("Application failed.", "Message:", err)
}

select {}
}

func RunDisperser(ctx *cli.Context) error {

config := NewConfig(ctx)

var logger common.Logger = nil

dis, err := NewDisperser(config, logger)
if err != nil {
return err
}

err = dis.Start(context.Background())
if err != nil {
return err
}

return nil

}
15 changes: 13 additions & 2 deletions disperser/grpc/client.go → disperser/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

type DispatcherConfig struct {
Timeout time.Duration
}

type Dispatcher struct {
*DispatcherConfig

logger common.Logger
}

timeout time.Duration
func NewDispatcher(cfg *DispatcherConfig, logger common.Logger) *Dispatcher {
return &Dispatcher{
DispatcherConfig: cfg,
logger: logger,
}
}

var _ disperser.Dispatcher = (*Dispatcher)(nil)
Expand Down Expand Up @@ -76,7 +87,7 @@ func (c *Dispatcher) sendChunks(ctx context.Context, chunkBatch core.ChunkBatch,
defer conn.Close()

gc := node.NewNodeClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()

request, err := getStoreChunksRequest(chunkBatch, header)
Expand Down
6 changes: 3 additions & 3 deletions disperser/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ServerConfig struct {
type DispersalServer struct {
pb.UnimplementedDisperserServer

config *ServerConfig
config ServerConfig

blobQueue disperser.BlobStore

Expand All @@ -32,7 +32,7 @@ type DispersalServer struct {
// NewServer creates a new Server struct with the provided parameters.
//
// Note: The Server's chunks store will be created at config.DbPath+"/chunk".
func NewDispersalServer(config *ServerConfig, queue disperser.BlobStore, logger common.Logger) *DispersalServer {
func NewDispersalServer(config ServerConfig, queue disperser.BlobStore, logger common.Logger) *DispersalServer {
return &DispersalServer{
config: config,
blobQueue: queue,
Expand Down Expand Up @@ -81,7 +81,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR

}

func (s *DispersalServer) Start() {
func (s *DispersalServer) Start(ctx context.Context) {
s.logger.Trace("Entering Start function...")
defer s.logger.Trace("Exiting Start function...")

Expand Down

0 comments on commit 117670a

Please sign in to comment.