Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TTL + table store #811

Merged
merged 21 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions common/kvstore/Batch.go

This file was deleted.

32 changes: 32 additions & 0 deletions common/kvstore/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kvstore

import "time"

// Batch is a collection of key / value pairs that will be written atomically to a database.
// Although it is thread safe to modify different batches in parallel or to modify a batch while
// the store is being modified, it is not thread safe to concurrently modify the same batch.
type Batch[K any] interface {
// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key K, value []byte)
// Delete removes the key from the batch.
Delete(key K)
// Apply atomically writes all the key / value pairs in the batch to the database.
Apply() error
// Size returns the number of operations in the batch.
Size() uint32
}

// TTLBatch is a collection of key / value pairs that will be written atomically to a database with
// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in
// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently
// modify the same batch.
type TTLBatch[K any] interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks this is not needed?

  • if it's a Table, there is TableBatch, operating on the raw keys
  • if it's a TableStore, there is TableStoreBatch, operating on the namespaced keys

And the ttl related interfaces are always there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch APIs have now been simplified so that they don't have generics. Are there additional simplifications you'd like to see?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we still need two batch interfaces if we want to support batches that can set different TTLs for operations within the same batch. The core reason is that the basic Store API does not have a concept of a TTL, meaning we need a batch interface that doesn't have TTLs.

Batch[K]
// PutWithTTL stores the given key / value pair in the batch with a time-to-live (TTL) or expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithTTL(key K, value []byte, ttl time.Duration)
// PutWithExpiration stores the given key / value pair in the batch with an expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithExpiration(key K, value []byte, expiryTime time.Time)
}
27 changes: 27 additions & 0 deletions common/kvstore/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kvstore

import "errors"

// ErrInvalidKey is returned when a key cannot be interpreted as the requested type.
var ErrInvalidKey = errors.New("invalid key")

// Key represents a key in a TableStore. Each key is scoped to a specific table.
type Key interface {
// Bytes returns the key as a byte slice. Does not include internal metadata (i.e. the table).
Bytes() []byte
// Raw returns the raw byte slice that represents the key. This value
// may not be equal to the byte slice that was used to create the key, and
// should be treated as an opaque value.
Raw() []byte
// Builder returns the KeyBuilder that created this key.
Builder() KeyBuilder
}

// KeyBuilder is used to create keys for a TableStore. Each KeyBuilder is scoped to a particular table,
// and can be used to create keys that are within that table.
type KeyBuilder interface {
// TableName returns the name of the table that this KeyBuilder is scoped to.
TableName() string
// Key creates a key from a byte slice.
Key(key []byte) Key
}
6 changes: 3 additions & 3 deletions common/kvstore/leveldb/leveldb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)

var _ kvstore.Store = &levelDBStore{}
var _ kvstore.Store[[]byte] = &levelDBStore{}

// levelDBStore implements kvstore.Store interfaces with levelDB as the backend engine.
type levelDBStore struct {
Expand All @@ -25,7 +25,7 @@ type levelDBStore struct {
}

// NewStore returns a new levelDBStore built using LevelDB.
func NewStore(logger logging.Logger, path string) (kvstore.Store, error) {
func NewStore(logger logging.Logger, path string) (kvstore.Store[[]byte], error) {
levelDB, err := leveldb.OpenFile(path, nil)

if err != nil {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (store *levelDBStore) WriteBatch(keys [][]byte, values [][]byte) error {
}

// NewBatch creates a new batch for the store.
func (store *levelDBStore) NewBatch() kvstore.StoreBatch {
func (store *levelDBStore) NewBatch() kvstore.Batch[[]byte] {
return &levelDBBatch{
store: store,
batch: new(leveldb.Batch),
Expand Down
6 changes: 3 additions & 3 deletions common/kvstore/mapstore/map_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
)

var _ kvstore.Store = &mapStore{}
var _ kvstore.Store[[]byte] = &mapStore{}

// mapStore is a simple in-memory implementation of KVStore. Designed more as a correctness test than a
// production implementation -- there are things that may not be performant with this implementation.
Expand All @@ -19,7 +19,7 @@ type mapStore struct {
}

// NewStore creates a new mapStore.
func NewStore() kvstore.Store {
func NewStore() kvstore.Store[[]byte] {
return &mapStore{
data: make(map[string][]byte),
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (store *mapStore) WriteBatch(keys, values [][]byte) error {
}

// NewBatch creates a new batch for the store.
func (store *mapStore) NewBatch() kvstore.StoreBatch {
func (store *mapStore) NewBatch() kvstore.Batch[[]byte] {
return &batch{
store: store,
keys: make([][]byte, 0),
Expand Down
16 changes: 7 additions & 9 deletions common/kvstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,31 @@ import (
// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")

// StoreBatch is a collection of operations that can be applied atomically to a Store.
type StoreBatch Batch[[]byte]

// Store implements a key-value store. May be backed by a database like LevelDB.
// The generic type K is the type of the keys in the store.
//
// Implementations of this interface are expected to be thread-safe.
type Store interface {
type Store[K any] interface {

// Put stores the given key / value pair in the database, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key []byte, value []byte) error
Put(k K, value []byte) error

// Get retrieves the value for the given key. Returns a ErrNotFound error if the key does not exist.
// The value returned is safe to modify.
Get(key []byte) ([]byte, error)
Get(k K) ([]byte, error)

// Delete removes the key from the database. Does not return an error if the key does not exist.
Delete(key []byte) error
Delete(k K) error

// NewBatch creates a new batch that can be used to perform multiple operations atomically.
NewBatch() StoreBatch
NewBatch() Batch[K]

// NewIterator returns an iterator that can be used to iterate over a subset of the keys in the database.
// Only keys with the given prefix will be iterated. The iterator must be closed by calling Release() when done.
// The iterator will return keys in lexicographically sorted order. The iterator walks over a consistent snapshot
// of the database, so it will not see any writes that occur after the iterator is created.
NewIterator(prefix []byte) (iterator.Iterator, error)
NewIterator(prefix K) (iterator.Iterator, error)

// Shutdown shuts down the store, flushing any remaining data to disk.
//
Expand Down
80 changes: 41 additions & 39 deletions common/kvstore/table.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
package kvstore

import "errors"

// ErrTableLimitExceeded is returned when the maximum number of tables has been reached.
var ErrTableLimitExceeded = errors.New("table limit exceeded")
import (
"errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"time"
)

// ErrTableNotFound is returned when a table is not found.
var ErrTableNotFound = errors.New("table not found")

// Table can be used to operate on data in a specific table in a TableStore.
type Table interface {
// Store permits access to the table as if it were a store.
Store

// Name returns the name of the table.
Name() string

// TableKey creates a new key scoped to this table that can be used for batch operations that modify this table.
TableKey(key []byte) TableKey
}

// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple
// table keys atomically.
type TableKey []byte

// TableBatch is a collection of operations that can be applied atomically to a TableStore.
type TableBatch Batch[TableKey]

// TableStore implements a key-value store, with the addition of the abstraction of tables.
// A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table,
// and keys within a particular table can be iterated over efficiently.
Expand All @@ -36,20 +18,40 @@ type TableBatch Batch[TableKey]
//
// Implementations of this interface are expected to be thread-safe, except where noted.
type TableStore interface {

// GetTable gets the table with the given name. If the table does not exist, it is first created.
// Returns ErrTableNotFound if the table does not exist and cannot be created.
GetTable(name string) (Table, error)

// GetTables returns a list of all tables in the store in no particular order.
GetTables() []Table

// NewBatch creates a new batch that can be used to perform multiple operations across tables atomically.
NewBatch() TableBatch

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error

// Destroy shuts down and permanently deletes all data in the store.
Destroy() error
Store[Key]

// GetKeyBuilder gets the key builder for a particular table. Returns ErrTableNotFound if the table does not exist.
// The returned KeyBuilder can be used to interact with the table.
//
// Warning: Do not use key builders created by one TableStore instance with another TableStore instance.
// This may result in odd and undefined behavior.
GetKeyBuilder(name string) (KeyBuilder, error)

// GetKeyBuilders returns all key builders in the store.
GetKeyBuilders() []KeyBuilder

// GetTables returns a list of the table names currently in the store.
GetTables() []string

// PutWithTTL adds a key-value pair to the store that expires after a specified duration.
// Key is eventually deleted after the TTL elapses.
//
// Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern
// may be implemented in the future if a use case is identified.
PutWithTTL(key Key, value []byte, ttl time.Duration) error

// PutWithExpiration adds a key-value pair to the store that expires at a specified time.
// Key is eventually deleted after the expiry time.
//
// Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern
// may be implemented in the future if a use case is identified.
PutWithExpiration(key Key, value []byte, expiryTime time.Time) error

// NewTTLBatch creates a new TTLBatch that can be used to perform multiple operations atomically.
// Use this instead of NewBatch to create a batch that supports TTL/expiration.
NewTTLBatch() TTLBatch[Key]

// NewTableIterator returns an iterator that can be used to iterate over all keys in a table.
// Equivalent to NewIterator(keyBuilder.Key([]byte{})).
NewTableIterator(keyBuilder KeyBuilder) (iterator.Iterator, error)
}
60 changes: 60 additions & 0 deletions common/kvstore/tablestore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package tablestore

import "time"

// StoreType describes the underlying store implementation.
type StoreType int

const (
// LevelDB is a LevelDB-backed store.
LevelDB StoreType = iota
// MapStore is an in-memory store. This store does not preserve data across restarts.
MapStore
)

// Config is the configuration for a TableStore.
type Config struct {
// The type of the base store. Default is LevelDB.
Type StoreType
// The path to the file system directory where the store will write its data. Default is nil.
// Some store implementations may ignore this field (e.g. MapStore). Other store implementations may require
// this field to be set (e.g. LevelDB).
Path *string
// If true, the store will perform garbage collection on a background goroutine. Default is true.
GarbageCollectionEnabled bool
// If garbage collection is enabled, this is the interval at which it will run. Default is 5 minutes.
GarbageCollectionInterval time.Duration
// If garbage collection is enabled, this is the maximum number of entries to delete in a single batch during
// garbage collection. Default is 1024.
GarbageCollectionBatchSize uint32
// The list of tables to create on startup. Any pre-existing table not in this list will be deleted. If
// this list is nil, the previous schema will be carried forward with no modifications. Default is nil.
Schema []string
}

// DefaultConfig returns a Config with default values.
func DefaultConfig() *Config {
return &Config{
Type: LevelDB,
Path: nil,
GarbageCollectionEnabled: true,
GarbageCollectionInterval: 5 * time.Minute,
GarbageCollectionBatchSize: 1024,
Schema: nil,
}
}

// DefaultLevelDBConfig returns a Config with default values for a LevelDB store.
func DefaultLevelDBConfig(path string) *Config {
config := DefaultConfig()
config.Type = LevelDB
config.Path = &path
return config
}

// DefaultMapStoreConfig returns a Config with default values for a MapStore.
func DefaultMapStoreConfig() *Config {
config := DefaultConfig()
config.Type = MapStore
return config
}
27 changes: 27 additions & 0 deletions common/kvstore/tablestore/key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package tablestore

import (
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetName(t *testing.T) {
tu.InitializeRandom()

tableName := tu.RandomString(10)

kb := newKeyBuilder(tableName, 0)
assert.Equal(t, tableName, kb.TableName())
}

func TestBytesRoundTrip(t *testing.T) {
tu.InitializeRandom()

tableName := tu.RandomString(10)
b := tu.RandomBytes(10)

kb := newKeyBuilder(tableName, 0)
k := kb.Key(b)
assert.Equal(t, b, k.Bytes())
}
Loading
Loading