From ea97ba7635af838c6cfcb2b61e72341cd2ec2eb4 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Thu, 20 Apr 2023 14:32:23 +0200 Subject: [PATCH] fix: fix memory leak with store logs worker not closed (#257) --- pkg/storage/sqlstorage/driver.go | 2 +- pkg/storage/sqlstorage/ledger/logs.go | 2 +- pkg/storage/sqlstorage/ledger/store.go | 32 ++++++++++++++++++-------- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/storage/sqlstorage/driver.go b/pkg/storage/sqlstorage/driver.go index 227f7b8e8..6b315a25c 100644 --- a/pkg/storage/sqlstorage/driver.go +++ b/pkg/storage/sqlstorage/driver.go @@ -122,7 +122,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) ( return schema.Close(context.Background()) }, func(ctx context.Context) error { return d.GetSystemStore().DeleteLedger(ctx, name) - }, d.storeConfig) + }, d.storeConfig, true) if err != nil { return nil, false, errors.Wrap(err, "creating ledger store") } diff --git a/pkg/storage/sqlstorage/ledger/logs.go b/pkg/storage/sqlstorage/ledger/logs.go index 3f4366abb..bf57090a6 100644 --- a/pkg/storage/sqlstorage/ledger/logs.go +++ b/pkg/storage/sqlstorage/ledger/logs.go @@ -136,7 +136,7 @@ func (s *Store) batchLogs(ctx context.Context, logs []*core.Log) error { } func (s *Store) AppendLog(ctx context.Context, log *core.Log) error { - if !s.isInitialized { + if !s.isInitialized || s.logsBatchWorker == nil { return storageerrors.StorageError(storage.ErrStoreNotInitialized) } recordMetrics := s.instrumentalized(ctx, "append_log") diff --git a/pkg/storage/sqlstorage/ledger/store.go b/pkg/storage/sqlstorage/ledger/store.go index a4947937c..b441e67bc 100644 --- a/pkg/storage/sqlstorage/ledger/store.go +++ b/pkg/storage/sqlstorage/ledger/store.go @@ -79,6 +79,12 @@ func (s *Store) Initialize(ctx context.Context) (bool, error) { } func (s *Store) Close(ctx context.Context) error { + if s.logsBatchWorker != nil { + if err := s.logsBatchWorker.Stop(ctx); err != nil { + return err + } + } + return s.onClose(ctx) } @@ -98,9 +104,14 @@ func (s *Store) RunInTransaction(ctx context.Context, f func(ctx context.Context newStore, err := NewStore( ctx, schema.NewSchema(tx.Tx, s.schema.Name()), - s.onClose, - s.onDelete, + func(ctx context.Context) error { + return nil + }, + func(ctx context.Context) error { + return nil + }, s.storeConfig, + false, ) if err != nil { return errors.Wrap(err, "creating new store") @@ -110,6 +121,7 @@ func (s *Store) RunInTransaction(ctx context.Context, f func(ctx context.Context defer func() { _ = tx.Rollback() + _ = newStore.Close(context.Background()) }() err = f(ctx, newStore) @@ -138,6 +150,7 @@ func NewStore( schema schema.Schema, onClose, onDelete func(ctx context.Context) error, storeConfig StoreConfig, + createLogsWorker bool, ) (*Store, error) { s := &Store{ schema: schema, @@ -146,19 +159,20 @@ func NewStore( storeConfig: storeConfig, } - logsBatchWorker := worker.NewWorker(s.batchLogs, storeConfig.StoreWorkerConfig) - s.logsBatchWorker = logsBatchWorker - metricsRegistry, err := metrics.RegisterSQLStorageMetrics(s.schema.Name()) if err != nil { return nil, errors.Wrap(err, "registering metrics") } s.metricsRegistry = metricsRegistry - go logsBatchWorker.Run(logging.ContextWithLogger( - context.Background(), - logging.FromContext(ctx), - )) + if createLogsWorker { + logsBatchWorker := worker.NewWorker(s.batchLogs, storeConfig.StoreWorkerConfig) + s.logsBatchWorker = logsBatchWorker + go logsBatchWorker.Run(logging.ContextWithLogger( + context.Background(), + logging.FromContext(ctx), + )) + } return s, nil }