Skip to content

Commit

Permalink
chore: Update compose with secondary store setup && minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
ethenotethan committed Feb 1, 2025
1 parent 0e7dcaf commit 9aaabe3
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 20 deletions.
75 changes: 63 additions & 12 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,81 @@
## The following is a proxy instance
## pointed to redis for storage caching and S3
## for storage failovers

services:
## Used as secondary read failover target
minio:
image: minio/minio:latest
container_name: minio
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
ports:
- "9000:9000"
- "9001:9001"
command: server /data
volumes:
- minio_data:/data

minio-init:
## Seed test bucket
image: minio/mc:latest
depends_on:
- minio
entrypoint: ["/bin/sh", "-c", "/usr/bin/create-bucket.sh"]
volumes:
- ./scripts/create-test-s3-bucket.sh:/usr/bin/create-bucket.sh

redis:
image: redis:latest
container_name: redis
command: redis-server --requirepass redispassword
environment:
- REDIS_PASSWORD=redispassword
ports:
- "6379:6379"

eigenda_proxy:
depends_on:
- minio-init
build:
context: .
dockerfile: Dockerfile
container_name: eigenda-proxy
environment:
- EIGENDA_PROXY_LOG_LEVEL=debug
- EIGENDA_PROXY_ADDR=0.0.0.0
- EIGENDA_PROXY_PORT=4242
- EIGENDA_PROXY_MEMSTORE_ENABLED=false
- EIGENDA_PROXY_MEMSTORE_ENABLED=true
- EIGENDA_PROXY_MEMSTORE_EXPIRATION=45m
- EIGENDA_PROXY_EIGENDA_SIGNER_PRIVATE_KEY_HEX=$PRIVATE_KEY
- EIGENDA_PROXY_EIGENDA_CERT_VERIFICATION_DISABLED=true
- EIGENDA_PROXY_EIGENDA_SIGNER_PRIVATE_KEY_HEX=${PRIVATE_KEY}
- EIGENDA_PROXY_EIGENDA_DISPERSER_RPC=disperser-holesky.eigenda.xyz:443
- EIGENDA_PROXY_EIGENDA_SERVICE_MANAGER_ADDR=0xD4A7E1Bd8015057293f0D0A557088c286942e84b
- EIGENDA_PROXY_EIGENDA_ETH_RPC=$ETH_RPC
- EIGENDA_PROXY_EIGENDA_ETH_RPC=https://ethereum-holesky-rpc.publicnode.com
- EIGENDA_PROXY_EIGENDA_ETH_CONFIRMATION_DEPTH=0
- EIGENDA_PROXY_METRICS_ADDR=0.0.0.0
- EIGENDA_PROXY_METRICS_ENABLED=true
- EIGENDA_PROXY_METRICS_PORT=7300
## S3
- EIGENDA_PROXY_S3_CREDENTIAL_TYPE=static
- EIGENDA_PROXY_S3_ACCESS_KEY_ID=minioadmin
- EIGENDA_PROXY_S3_ACCESS_KEY_SECRET=minioadmin
- EIGENDA_PROXY_S3_BUCKET=eigenda-proxy-test
- EIGENDA_PROXY_S3_PATH=""
- EIGENDA_PROXY_S3_ENDPOINT=minio:9000
- EIGENDA_PROXY_S3_ENABLE_TLS=false

# Redis Configuration
- EIGENDA_PROXY_REDIS_DB=0
- EIGENDA_PROXY_REDIS_ENDPOINT=redis:6379
- EIGENDA_PROXY_REDIS_PASSWORD=redispassword
- EIGENDA_PROXY_REDIS_EVICTION=24h0m0s

## Secondary routing
- EIGENDA_PROXY_STORAGE_FALLBACK_TARGETS=s3
- EIGENDA_PROXY_STORAGE_CACHE_TARGETS=redis

ports:
- 4242:4242
- 7300:7300
Expand Down Expand Up @@ -44,14 +103,6 @@ services:
depends_on:
- prometheus

traffic-generator:
image: alpine:latest
build: scripts/
container_name: traffic_generator
depends_on:
- eigenda_proxy
volumes:
- ./scripts/:/scripts/

volumes:
grafana-data:
minio_data:
15 changes: 15 additions & 0 deletions scripts/create-test-s3-bucket.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

# Wait 2 seconds to ensure minio is finished bootstrapping
# TODO: Update this to do event based polling on minio server directly vs semi-arbitrary timeout
sleep 2s

# Configure MinIO client (mc)
echo "Configuring MinIO client..."
mc alias set local http://minio:9000 minioadmin minioadmin

# Ensure the bucket exists
echo "Creating bucket: eigenda-proxy-test..."
mc mb local/eigenda-proxy-test || echo "Bucket already exists."

echo "Bucket setup complete."
8 changes: 6 additions & 2 deletions server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr
return nil, err
}

// create secondary storage router
// create secondary storage manager
fallbacks := populateTargets(cfg.EigenDAConfig.StorageConfig.FallbackTargets, s3Store, redisStore)
caches := populateTargets(cfg.EigenDAConfig.StorageConfig.CacheTargets, s3Store, redisStore)
secondary := store.NewSecondaryManager(log, m, caches, fallbacks)
Expand All @@ -135,6 +135,10 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr
}
}

log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil)
log.Info("Created storage backends",
"eigenda", eigenDA != nil,
"s3", s3Store != nil,
"redis", redisStore != nil,
)
return store.NewManager(eigenDA, s3Store, log, secondary)
}
3 changes: 1 addition & 2 deletions store/generated_key/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func (e *MemStore) pruningLoop(ctx context.Context) {
return

case <-timer.C:
e.l.Debug("pruning expired blobs")
e.pruneExpired()
}
}
Expand All @@ -99,7 +98,7 @@ func (e *MemStore) pruneExpired() {
delete(e.keyStarts, commit)
delete(e.store, commit)

e.l.Info("blob pruned", "commit", commit)
e.l.Debug("blob pruned", "commit", commit)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,13 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, key, v

// 2 - Put blob into secondary storage backends
if m.secondary.Enabled() && m.secondary.AsyncWriteEntry() { // publish put notification to secondary's subscription on PutNotify topic
m.log.Debug("Publishing data to async secondary stores")
m.secondary.Topic() <- PutNotify{
Commitment: commit,
Value: value,
}
} else if m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() { // secondary is available only for synchronous writes
m.log.Debug("Publishing data to single threaded secondary stores")
err := m.secondary.HandleRedundantWrites(ctx, commit, value)
if err != nil {
m.log.Error("Secondary insertions failed", "error", err.Error())
Expand Down
9 changes: 5 additions & 4 deletions store/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ISecondary interface {
WriteSubscriptionLoop(ctx context.Context)
}

// PutNotify ... notification received by primary router to perform insertion across
// PutNotify ... notification received by primary manager to perform insertion across
// secondary storage backends
type PutNotify struct {
Commitment []byte
Expand All @@ -53,7 +53,7 @@ type SecondaryManager struct {
concurrentWrites bool
}

// NewSecondaryManager ... creates a new secondary storage router
// NewSecondaryManager ... creates a new secondary storage manager
func NewSecondaryManager(log log.Logger, m metrics.Metricer, caches []common.PrecomputedKeyStore, fallbacks []common.PrecomputedKeyStore) ISecondary {
return &SecondaryManager{
topic: make(chan PutNotify), // channel is un-buffered which dispersing consumption across routines helps alleviate
Expand Down Expand Up @@ -92,6 +92,7 @@ func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitmen
successes := 0

for _, src := range sources {
sm.log.Debug("Attempting to write to secondary storage", "backend", src.BackendType())
cb := sm.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodPut)

// for added safety - we retry the insertion 5x using a default exponential backoff
Expand All @@ -115,12 +116,12 @@ func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitmen
return nil
}

// AsyncWriteEntry ... subscribes to put notifications posted to shared topic with primary router
// AsyncWriteEntry ... subscribes to put notifications posted to shared topic with primary manager
func (sm *SecondaryManager) AsyncWriteEntry() bool {
return sm.concurrentWrites
}

// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary router
// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary manager
func (sm *SecondaryManager) WriteSubscriptionLoop(ctx context.Context) {
sm.concurrentWrites = true

Expand Down

0 comments on commit 9aaabe3

Please sign in to comment.