Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TTL + table store #811

Merged
merged 21 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions common/kvstore/Batch.go

This file was deleted.

32 changes: 32 additions & 0 deletions common/kvstore/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kvstore

import "time"

// Batch is a collection of key / value pairs that will be written atomically to a database.
// Although it is thread safe to modify different batches in parallel or to modify a batch while
// the store is being modified, it is not thread safe to concurrently modify the same batch.
type Batch[K any] interface {
// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key K, value []byte)
// Delete removes the key from the batch.
Delete(key K)
// Apply atomically writes all the key / value pairs in the batch to the database.
Apply() error
// Size returns the number of operations in the batch.
Size() uint32
}

// TTLBatch is a collection of key / value pairs that will be written atomically to a database with
// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in
// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently
// modify the same batch.
type TTLBatch[K any] interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks this is not needed?

  • if it's a Table, there is TableBatch, operating on the raw keys
  • if it's a TableStore, there is TableStoreBatch, operating on the namespaced keys

And the ttl related interfaces are always there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch APIs have now been simplified so that they don't have generics. Are there additional simplifications you'd like to see?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we still need two batch interfaces if we want to support batches that can set different TTLs for operations within the same batch. The core reason is that the basic Store API does not have a concept of a TTL, meaning we need a batch interface that doesn't have TTLs.

Batch[K]
// PutWithTTL stores the given key / value pair in the batch with a time-to-live (TTL) or expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithTTL(key K, value []byte, ttl time.Duration)
// PutWithExpiration stores the given key / value pair in the batch with an expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithExpiration(key K, value []byte, expiryTime time.Time)
}
42 changes: 33 additions & 9 deletions common/kvstore/table.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,55 @@
package kvstore

import "errors"

// ErrTableLimitExceeded is returned when the maximum number of tables has been reached.
var ErrTableLimitExceeded = errors.New("table limit exceeded")
import (
"errors"
"time"
)

// ErrTableNotFound is returned when a table is not found.
var ErrTableNotFound = errors.New("table not found")

// TTLStoreBatch is a collection of key / value pairs that will be written atomically to a database with
// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in
// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently
// modify the same batch.
type TTLStoreBatch TTLBatch[[]byte]

// Table can be used to operate on data in a specific table in a TableStore.
type Table interface {
// Store permits access to the table as if it were a store.
Store

// Name returns the name of the table.
Name() string

// TableKey creates a new key scoped to this table that can be used for batch operations that modify this table.
// TableKey creates a new key scoped to this table that can be used for TableStoreBatch
// operations that modify this table.
TableKey(key []byte) TableKey

// PutWithTTL adds a key-value pair to the store that expires after a specified duration.
// Key is eventually deleted after the TTL elapses.
//
// Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern
// may be implemented in the future if a use case is identified.
PutWithTTL(key []byte, value []byte, ttl time.Duration) error

// PutWithExpiration adds a key-value pair to the store that expires at a specified time.
// Key is eventually deleted after the expiry time.
//
// Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern
// may be implemented in the future if a use case is identified.
PutWithExpiration(key []byte, value []byte, expiryTime time.Time) error

// NewTTLBatch creates a new TTLBatch that can be used to perform multiple operations atomically.
// Use this instead of NewBatch to create a batch that supports TTL/expiration.
NewTTLBatch() TTLStoreBatch
}

// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple
// table keys atomically.
type TableKey []byte

// TableBatch is a collection of operations that can be applied atomically to a TableStore.
type TableBatch Batch[TableKey]
// TableStoreBatch is a collection of operations that can be applied atomically to a TableStore.
type TableStoreBatch TTLBatch[TableKey]

// TableStore implements a key-value store, with the addition of the abstraction of tables.
// A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table,
Expand All @@ -45,7 +69,7 @@ type TableStore interface {
GetTables() []Table

// NewBatch creates a new batch that can be used to perform multiple operations across tables atomically.
NewBatch() TableBatch
NewBatch() TableStoreBatch

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error
Expand Down
60 changes: 60 additions & 0 deletions common/kvstore/tablestore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package tablestore

import "time"

// StoreType describes the underlying store implementation.
type StoreType int

const (
// LevelDB is a LevelDB-backed store.
LevelDB StoreType = iota
// MapStore is an in-memory store. This store does not preserve data across restarts.
MapStore
)

// Config is the configuration for a TableStore.
type Config struct {
// The type of the base store. Default is LevelDB.
Type StoreType
// The path to the file system directory where the store will write its data. Default is nil.
// Some store implementations may ignore this field (e.g. MapStore). Other store implementations may require
// this field to be set (e.g. LevelDB).
Path *string
// If true, the store will perform garbage collection on a background goroutine. Default is true.
GarbageCollectionEnabled bool
// If garbage collection is enabled, this is the interval at which it will run. Default is 5 minutes.
GarbageCollectionInterval time.Duration
// If garbage collection is enabled, this is the maximum number of entries to delete in a single batch during
// garbage collection. Default is 1024.
GarbageCollectionBatchSize uint32
// The list of tables to create on startup. Any pre-existing table not in this list will be deleted. If
// this list is nil, the previous schema will be carried forward with no modifications. Default is nil.
Schema []string
}

// DefaultConfig returns a Config with default values.
func DefaultConfig() *Config {
return &Config{
Type: LevelDB,
Path: nil,
GarbageCollectionEnabled: true,
GarbageCollectionInterval: 5 * time.Minute,
GarbageCollectionBatchSize: 1024,
Schema: nil,
}
}

// DefaultLevelDBConfig returns a Config with default values for a LevelDB store.
func DefaultLevelDBConfig(path string) *Config {
config := DefaultConfig()
config.Type = LevelDB
config.Path = &path
return config
}

// DefaultMapStoreConfig returns a Config with default values for a MapStore.
func DefaultMapStoreConfig() *Config {
config := DefaultConfig()
config.Type = MapStore
return config
}
118 changes: 110 additions & 8 deletions common/kvstore/tablestore/table_store.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
package tablestore

import (
"context"
"github.com/Layr-Labs/eigenda/common/kvstore"
"github.com/Layr-Labs/eigensdk-go/logging"
"sync"
"time"
)

var _ kvstore.TableStore = &tableStore{}

// tableStore is an implementation of TableStore that wraps a Store.
type tableStore struct {
// the context for the store
ctx context.Context

// the cancel function for the store
cancel context.CancelFunc

// this wait group is completed when the garbage collection goroutine is done
waitGroup *sync.WaitGroup

// the logger for the store
logger logging.Logger

// A base store implementation that this TableStore wraps.
base kvstore.Store

// A map from table names to tables.
tableMap map[string]kvstore.Table

// A map containing expiration times. Keys in this table are made up of a timestamp prepended to a key.
// The value is an empty byte slice. Iterating over this table will return keys in order of expiration time.
expirationTable kvstore.Table
}

// wrapper wraps the given Store to create a TableStore.
Expand All @@ -26,13 +43,35 @@ type tableStore struct {
func newTableStore(
logger logging.Logger,
base kvstore.Store,
tables map[string]kvstore.Table) kvstore.TableStore {
tableIDMap map[uint32]string,
expirationTable kvstore.Table,
gcEnabled bool,
gcPeriod time.Duration,
gcBatchSize uint32) kvstore.TableStore {

ctx, cancel := context.WithCancel(context.Background())
waitGroup := &sync.WaitGroup{}

store := &tableStore{
ctx: ctx,
cancel: cancel,
waitGroup: waitGroup,
logger: logger,
base: base,
tableMap: make(map[string]kvstore.Table),
expirationTable: expirationTable,
}

for prefix, name := range tableIDMap {
table := newTableView(base, name, prefix, store.Shutdown, store.Destroy, store.NewBatch)
store.tableMap[name] = table
}

return &tableStore{
logger: logger,
base: base,
tableMap: tables,
if gcEnabled {
store.expireKeysInBackground(gcPeriod, gcBatchSize)
}

return store
}

// GetTable gets the table with the given name. If the table does not exist, it is first created.
Expand All @@ -56,19 +95,82 @@ func (t *tableStore) GetTables() []kvstore.Table {
}

// NewBatch creates a new batch for writing to the store.
func (t *tableStore) NewBatch() kvstore.TableBatch {
func (t *tableStore) NewBatch() kvstore.TableStoreBatch {
return &tableStoreBatch{
store: t,
batch: t.base.NewBatch(),
batch: t.base.NewBatch(),
expirationTable: t.expirationTable,
}
}

// ExpireKeysInBackground spawns a background goroutine that periodically checks for expired keys and deletes them.
func (t *tableStore) expireKeysInBackground(gcPeriod time.Duration, gcBatchSize uint32) {
t.waitGroup.Add(1)
ticker := time.NewTicker(gcPeriod)
go func() {
defer t.waitGroup.Done()
for {
select {
case now := <-ticker.C:
err := t.expireKeys(now, gcBatchSize)
if err != nil {
t.logger.Error("Error expiring keys", err)
continue
}
case <-t.ctx.Done():
ticker.Stop()
return
}
}
}()
}

// Delete all keys with a TTL that has expired.
func (t *tableStore) expireKeys(now time.Time, gcBatchSize uint32) error {
it, err := t.expirationTable.NewIterator(nil)
if err != nil {
return err
}
defer it.Release()

batch := t.NewBatch()

for it.Next() {
expiryKey := it.Key()
expiryTimestamp, baseKey := parsePrependedTimestamp(expiryKey)

if expiryTimestamp.After(now) {
// No more values to expire
break
}

batch.Delete(baseKey)
batch.Delete(expiryKey)

if batch.Size() >= gcBatchSize {
err = batch.Apply()
if err != nil {
return err
}
batch = t.NewBatch()
}
}

if batch.Size() > 0 {
return batch.Apply()
}
return nil
}

// Shutdown shuts down the store, flushing any remaining cached data to disk.
func (t *tableStore) Shutdown() error {
t.cancel()
t.waitGroup.Wait()
return t.base.Shutdown()
}

// Destroy shuts down and permanently deletes all data in the store.
func (t *tableStore) Destroy() error {
t.cancel()
t.waitGroup.Wait()
return t.base.Destroy()
}
25 changes: 22 additions & 3 deletions common/kvstore/tablestore/table_store_batch.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
package tablestore

import "github.com/Layr-Labs/eigenda/common/kvstore"
import (
"github.com/Layr-Labs/eigenda/common/kvstore"
"time"
)

var _ kvstore.TableStoreBatch = &tableStoreBatch{}

// tableStoreBatch is a batch for writing to a table store.
type tableStoreBatch struct {
store *tableStore
batch kvstore.StoreBatch
batch kvstore.StoreBatch
expirationTable kvstore.Table
}

// PutWithTTL adds a key-value pair to the batch that expires after a specified duration.
func (t *tableStoreBatch) PutWithTTL(key kvstore.TableKey, value []byte, ttl time.Duration) {
expirationTime := time.Now().Add(ttl)
t.PutWithExpiration(key, value, expirationTime)
}

// PutWithExpiration adds a key-value pair to the batch that expires at a specified time.
func (t *tableStoreBatch) PutWithExpiration(key kvstore.TableKey, value []byte, expiryTime time.Time) {
expirationKey := t.expirationTable.TableKey(prependTimestamp(expiryTime, key))

t.Put(key, value)
t.Put(expirationKey, make([]byte, 0))
}

// Put adds a key-value pair to the batch.
Expand Down
Loading
Loading