diff --git a/exp/lighthorizon/index/backend/backend.go b/exp/lighthorizon/index/backend/backend.go new file mode 100644 index 0000000000..580e5f4d6e --- /dev/null +++ b/exp/lighthorizon/index/backend/backend.go @@ -0,0 +1,14 @@ +package index + +import types "github.com/stellar/go/exp/lighthorizon/index/types" + +// TODO: Use a more standardized filesystem-style backend, so we can re-use +// code +type Backend interface { + Flush(map[string]types.NamedIndices) error + FlushAccounts([]string) error + Read(account string) (types.NamedIndices, error) + ReadAccounts() ([]string, error) + FlushTransactions(map[string]*types.TrieIndex) error + ReadTransactions(prefix string) (*types.TrieIndex, error) +} diff --git a/exp/lighthorizon/index/file.go b/exp/lighthorizon/index/backend/file.go similarity index 90% rename from exp/lighthorizon/index/file.go rename to exp/lighthorizon/index/backend/file.go index 530c089582..dda1858cb8 100644 --- a/exp/lighthorizon/index/file.go +++ b/exp/lighthorizon/index/backend/file.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" + types "github.com/stellar/go/exp/lighthorizon/index/types" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" ) @@ -17,14 +18,6 @@ type FileBackend struct { parallel uint32 } -func NewFileStore(dir string, parallel uint32) (Store, error) { - backend, err := NewFileBackend(dir, parallel) - if err != nil { - return nil, err - } - return NewStore(backend) -} - func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) { return &FileBackend{ dir: dir, @@ -32,7 +25,7 @@ func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) { }, nil } -func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) error { +func (s *FileBackend) Flush(indexes map[string]types.NamedIndices) error { return parallelFlush(s.parallel, indexes, s.writeBatch) } @@ -88,7 +81,7 @@ func (s *FileBackend) writeBatch(b *batch) error { return nil } -func (s *FileBackend) FlushTransactions(indexes map[string]*TrieIndex) error { +func (s *FileBackend) FlushTransactions(indexes map[string]*types.TrieIndex) error { // TODO: Parallelize this for key, index := range indexes { path := filepath.Join(s.dir, "tx", key) @@ -125,7 +118,7 @@ func (s *FileBackend) FlushTransactions(indexes map[string]*TrieIndex) error { return nil } -func (s *FileBackend) Read(account string) (map[string]*CheckpointIndex, error) { +func (s *FileBackend) Read(account string) (types.NamedIndices, error) { log.Debugf("Opening index: %s", account) b, err := os.Open(filepath.Join(s.dir, account[:3], account)) if err != nil { @@ -185,7 +178,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) { return accounts, nil } -func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) { +func (s *FileBackend) ReadTransactions(prefix string) (*types.TrieIndex, error) { log.Debugf("Opening index: %s", prefix) b, err := os.Open(filepath.Join(s.dir, "tx", prefix)) if err != nil { @@ -198,7 +191,7 @@ func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) { return nil, os.ErrNotExist } defer zr.Close() - var index TrieIndex + var index types.TrieIndex _, err = index.ReadFrom(zr) if err != nil { log.Errorf("Unable to parse %s: %v", prefix, err) diff --git a/exp/lighthorizon/index/file_test.go b/exp/lighthorizon/index/backend/file_test.go similarity index 100% rename from exp/lighthorizon/index/file_test.go rename to exp/lighthorizon/index/backend/file_test.go diff --git a/exp/lighthorizon/index/gzip.go b/exp/lighthorizon/index/backend/gzip.go similarity index 73% rename from exp/lighthorizon/index/gzip.go rename to exp/lighthorizon/index/backend/gzip.go index c2a272e1c9..37a0f41ac1 100644 --- a/exp/lighthorizon/index/gzip.go +++ b/exp/lighthorizon/index/backend/gzip.go @@ -4,9 +4,11 @@ import ( "bytes" "compress/gzip" "io" + + types "github.com/stellar/go/exp/lighthorizon/index/types" ) -func writeGzippedTo(w io.Writer, indexes map[string]*CheckpointIndex) (int64, error) { +func writeGzippedTo(w io.Writer, indexes types.NamedIndices) (int64, error) { zw := gzip.NewWriter(w) var n int64 @@ -28,12 +30,12 @@ func writeGzippedTo(w io.Writer, indexes map[string]*CheckpointIndex) (int64, er return n, nil } -func readGzippedFrom(r io.Reader) (map[string]*CheckpointIndex, int64, error) { +func readGzippedFrom(r io.Reader) (types.NamedIndices, int64, error) { zr, err := gzip.NewReader(r) if err != nil { return nil, 0, err } - indexes := map[string]*CheckpointIndex{} + indexes := types.NamedIndices{} var buf bytes.Buffer var n int64 for { @@ -45,7 +47,7 @@ func readGzippedFrom(r io.Reader) (map[string]*CheckpointIndex, int64, error) { return nil, n, err } - ind, err := NewCheckpointIndexFromBytes(buf.Bytes()) + ind, err := types.NewCheckpointIndexFromBytes(buf.Bytes()) if err != nil { return nil, n, err } diff --git a/exp/lighthorizon/index/gzip_test.go b/exp/lighthorizon/index/backend/gzip_test.go similarity index 75% rename from exp/lighthorizon/index/gzip_test.go rename to exp/lighthorizon/index/backend/gzip_test.go index 32058b07ee..6efbc8c5e8 100644 --- a/exp/lighthorizon/index/gzip_test.go +++ b/exp/lighthorizon/index/backend/gzip_test.go @@ -4,12 +4,13 @@ import ( "bytes" "testing" + types "github.com/stellar/go/exp/lighthorizon/index/types" "github.com/stretchr/testify/require" ) func TestGzipWriteReadRoundtrip(t *testing.T) { - indexes := map[string]*CheckpointIndex{} - index := &CheckpointIndex{} + indexes := types.NamedIndices{} + index := &types.CheckpointIndex{} index.SetActive(5) indexes["A"] = index diff --git a/exp/lighthorizon/index/parallel_flush.go b/exp/lighthorizon/index/backend/parallel_flush.go similarity index 84% rename from exp/lighthorizon/index/parallel_flush.go rename to exp/lighthorizon/index/backend/parallel_flush.go index 37b8bbb39e..4f9b14e783 100644 --- a/exp/lighthorizon/index/parallel_flush.go +++ b/exp/lighthorizon/index/backend/parallel_flush.go @@ -5,17 +5,18 @@ import ( "sync/atomic" "time" + types "github.com/stellar/go/exp/lighthorizon/index/types" "github.com/stellar/go/support/log" ) type batch struct { account string - indexes map[string]*CheckpointIndex + indexes types.NamedIndices } type flushBatch func(b *batch) error -func parallelFlush(parallel uint32, allIndexes map[string]map[string]*CheckpointIndex, f flushBatch) error { +func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f flushBatch) error { var wg sync.WaitGroup batches := make(chan *batch, parallel) diff --git a/exp/lighthorizon/index/s3.go b/exp/lighthorizon/index/backend/s3.go similarity index 89% rename from exp/lighthorizon/index/s3.go rename to exp/lighthorizon/index/backend/s3.go index f57fa4dfda..59df939596 100644 --- a/exp/lighthorizon/index/s3.go +++ b/exp/lighthorizon/index/backend/s3.go @@ -15,6 +15,8 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" + + types "github.com/stellar/go/exp/lighthorizon/index/types" ) const BUCKET = "horizon-index" @@ -27,14 +29,6 @@ type S3Backend struct { prefix string } -func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) { - backend, err := NewS3Backend(awsConfig, prefix, parallel) - if err != nil { - return nil, err - } - return NewStore(backend) -} - func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Backend, error) { s3Session, err := session.NewSession(awsConfig) if err != nil { @@ -72,7 +66,7 @@ func (s *S3Backend) FlushAccounts(accounts []string) error { return nil } -func (s *S3Backend) Flush(indexes map[string]map[string]*CheckpointIndex) error { +func (s *S3Backend) Flush(indexes map[string]types.NamedIndices) error { return parallelFlush(s.parallel, indexes, s.writeBatch) } @@ -98,7 +92,7 @@ func (s *S3Backend) writeBatch(b *batch) error { return nil } -func (s *S3Backend) FlushTransactions(indexes map[string]*TrieIndex) error { +func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error { // TODO: Parallelize this var buf bytes.Buffer for key, index := range indexes { @@ -156,7 +150,7 @@ func (s *S3Backend) path(account string) string { return filepath.Join(s.prefix, account[:10], account) } -func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) { +func (s *S3Backend) Read(account string) (types.NamedIndices, error) { // Check if index exists in S3 log.Debugf("Downloading index: %s", account) var err error @@ -179,7 +173,7 @@ func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) { if n == 0 { return nil, os.ErrNotExist } - var indexes map[string]*CheckpointIndex + var indexes map[string]*types.CheckpointIndex indexes, _, err = readGzippedFrom(bytes.NewReader(b.Bytes())) if err != nil { log.Errorf("Unable to parse %s: %v", account, err) @@ -191,7 +185,7 @@ func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) { return nil, err } -func (s *S3Backend) ReadTransactions(prefix string) (*TrieIndex, error) { +func (s *S3Backend) ReadTransactions(prefix string) (*types.TrieIndex, error) { // Check if index exists in S3 log.Debugf("Downloading index: %s", prefix) b := &aws.WriteAtBuffer{} @@ -216,7 +210,7 @@ func (s *S3Backend) ReadTransactions(prefix string) (*TrieIndex, error) { } defer zr.Close() - var index TrieIndex + var index types.TrieIndex _, err = index.ReadFrom(zr) if err != nil { log.Errorf("Unable to parse %s: %v", prefix, err) diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index 0fa871d38d..ac80ac932b 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/stellar/go/exp/lighthorizon/index" + types "github.com/stellar/go/exp/lighthorizon/index/types" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" ) @@ -337,7 +338,7 @@ func (cfg *ReduceConfig) shouldProcessTx(txPrefix byte, routineIndex uint32) boo // For every index that exists in `dest`, finds the corresponding index in // `source` and merges it into `dest`'s version. -func mergeIndices(dest, source map[string]*index.CheckpointIndex) error { +func mergeIndices(dest, source map[string]*types.CheckpointIndex) error { for name, index := range dest { // The source doesn't contain this particular index. // diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index 4620046f29..cc0a9dd4b0 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -6,6 +6,8 @@ import ( "path/filepath" "github.com/aws/aws-sdk-go/aws" + + backend "github.com/stellar/go/exp/lighthorizon/index/backend" ) func Connect(backendUrl string) (Store, error) { @@ -31,3 +33,19 @@ func Connect(backendUrl string) (Store, error) { parsed.Scheme, backendUrl) } } + +func NewFileStore(dir string, parallel uint32) (Store, error) { + backend, err := backend.NewFileBackend(dir, parallel) + if err != nil { + return nil, err + } + return NewStore(backend) +} + +func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) { + backend, err := backend.NewS3Backend(awsConfig, prefix, parallel) + if err != nil { + return nil, err + } + return NewStore(backend) +} diff --git a/exp/lighthorizon/index/store.go b/exp/lighthorizon/index/store.go index 640af10f76..a36e60ec6e 100644 --- a/exp/lighthorizon/index/store.go +++ b/exp/lighthorizon/index/store.go @@ -6,6 +6,9 @@ import ( "io" "os" "sync" + + backend "github.com/stellar/go/exp/lighthorizon/index/backend" + types "github.com/stellar/go/exp/lighthorizon/index/types" ) type Store interface { @@ -14,37 +17,26 @@ type Store interface { TransactionTOID(hash [32]byte) (int64, error) AddParticipantsToIndexes(checkpoint uint32, index string, participants []string) error AddParticipantsToIndexesNoBackend(checkpoint uint32, index string, participants []string) error - AddParticipantToIndexesNoBackend(participant string, indexes map[string]*CheckpointIndex) + AddParticipantToIndexesNoBackend(participant string, indexes types.NamedIndices) Flush() error FlushAccounts() error - Read(account string) (map[string]*CheckpointIndex, error) - ReadAccounts() ([]string, error) - ReadTransactions(prefix string) (*TrieIndex, error) - MergeTransactions(prefix string, other *TrieIndex) error -} - -// TODO: Use a more standardized filesystem-style backend, so we can re-use -// code -type Backend interface { - Flush(map[string]map[string]*CheckpointIndex) error - FlushAccounts([]string) error - Read(account string) (map[string]*CheckpointIndex, error) + Read(account string) (types.NamedIndices, error) ReadAccounts() ([]string, error) - FlushTransactions(map[string]*TrieIndex) error - ReadTransactions(prefix string) (*TrieIndex, error) + ReadTransactions(prefix string) (*types.TrieIndex, error) + MergeTransactions(prefix string, other *types.TrieIndex) error } type store struct { mutex sync.RWMutex - indexes map[string]map[string]*CheckpointIndex - txIndexes map[string]*TrieIndex - backend Backend + indexes map[string]types.NamedIndices + txIndexes map[string]*types.TrieIndex + backend backend.Backend } -func NewStore(backend Backend) (Store, error) { +func NewStore(backend backend.Backend) (Store, error) { return &store{ - indexes: map[string]map[string]*CheckpointIndex{}, - txIndexes: map[string]*TrieIndex{}, + indexes: map[string]types.NamedIndices{}, + txIndexes: map[string]*types.TrieIndex{}, backend: backend, }, nil } @@ -63,7 +55,7 @@ func (s *store) FlushAccounts() error { return s.backend.FlushAccounts(s.accounts()) } -func (s *store) Read(account string) (map[string]*CheckpointIndex, error) { +func (s *store) Read(account string) (types.NamedIndices, error) { return s.backend.Read(account) } @@ -71,11 +63,11 @@ func (s *store) ReadAccounts() ([]string, error) { return s.backend.ReadAccounts() } -func (s *store) ReadTransactions(prefix string) (*TrieIndex, error) { +func (s *store) ReadTransactions(prefix string) (*types.TrieIndex, error) { return s.getCreateTrieIndex(prefix) } -func (s *store) MergeTransactions(prefix string, other *TrieIndex) error { +func (s *store) MergeTransactions(prefix string, other *types.TrieIndex) error { index, err := s.getCreateTrieIndex(prefix) if err != nil { return err @@ -103,12 +95,12 @@ func (s *store) Flush() error { } // clear indexes to save memory - s.indexes = map[string]map[string]*CheckpointIndex{} + s.indexes = map[string]types.NamedIndices{} if err := s.backend.FlushTransactions(s.txIndexes); err != nil { return err } - s.txIndexes = map[string]*TrieIndex{} + s.txIndexes = map[string]*types.TrieIndex{} return nil } @@ -147,12 +139,12 @@ func (s *store) AddParticipantsToIndexesNoBackend(checkpoint uint32, index strin for _, participant := range participants { _, ok := s.indexes[participant] if !ok { - s.indexes[participant] = map[string]*CheckpointIndex{} + s.indexes[participant] = map[string]*types.CheckpointIndex{} } ind, ok := s.indexes[participant][index] if !ok { - ind = &CheckpointIndex{} + ind = &types.CheckpointIndex{} s.indexes[participant][index] = ind } @@ -165,7 +157,7 @@ func (s *store) AddParticipantsToIndexesNoBackend(checkpoint uint32, index strin return nil } -func (s *store) AddParticipantToIndexesNoBackend(participant string, indexes map[string]*CheckpointIndex) { +func (s *store) AddParticipantToIndexesNoBackend(participant string, indexes types.NamedIndices) { s.mutex.Lock() defer s.mutex.Unlock() s.indexes[participant] = indexes @@ -185,14 +177,14 @@ func (s *store) AddParticipantsToIndexes(checkpoint uint32, index string, partic return nil } -func (s *store) getCreateIndex(account, id string) (*CheckpointIndex, error) { +func (s *store) getCreateIndex(account, id string) (*types.CheckpointIndex, error) { s.mutex.Lock() defer s.mutex.Unlock() // Check if we already have it loaded accountIndexes, ok := s.indexes[account] if !ok { - accountIndexes = map[string]*CheckpointIndex{} + accountIndexes = types.NamedIndices{} } ind, ok := accountIndexes[id] if ok { @@ -210,7 +202,7 @@ func (s *store) getCreateIndex(account, id string) (*CheckpointIndex, error) { ind, ok = accountIndexes[id] if !ok { // Not found anywhere, make a new one. - ind = &CheckpointIndex{} + ind = &types.CheckpointIndex{} accountIndexes[id] = ind } s.indexes[account] = accountIndexes @@ -226,7 +218,7 @@ func (s *store) NextActive(account, indexId string, afterCheckpoint uint32) (uin return ind.NextActive(afterCheckpoint) } -func (s *store) getCreateTrieIndex(prefix string) (*TrieIndex, error) { +func (s *store) getCreateTrieIndex(prefix string) (*types.TrieIndex, error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -247,7 +239,7 @@ func (s *store) getCreateTrieIndex(prefix string) (*TrieIndex, error) { index, ok = s.txIndexes[prefix] if !ok { // Not found anywhere, make a new one. - index = &TrieIndex{} + index = &types.TrieIndex{} s.txIndexes[prefix] = index } diff --git a/exp/lighthorizon/index/main.go b/exp/lighthorizon/index/types/bitmap.go similarity index 99% rename from exp/lighthorizon/index/main.go rename to exp/lighthorizon/index/types/bitmap.go index a14713b37a..a3446c9bf3 100644 --- a/exp/lighthorizon/index/main.go +++ b/exp/lighthorizon/index/types/bitmap.go @@ -17,6 +17,8 @@ type CheckpointIndex struct { lastCheckpoint uint32 } +type NamedIndices map[string]*CheckpointIndex + func NewCheckpointIndexFromBytes(b []byte) (*CheckpointIndex, error) { xdrCheckpoint := xdr.CheckpointIndex{} err := xdrCheckpoint.UnmarshalBinary(b) diff --git a/exp/lighthorizon/index/main_test.go b/exp/lighthorizon/index/types/bitmap_test.go similarity index 100% rename from exp/lighthorizon/index/main_test.go rename to exp/lighthorizon/index/types/bitmap_test.go diff --git a/exp/lighthorizon/index/trie.go b/exp/lighthorizon/index/types/trie.go similarity index 100% rename from exp/lighthorizon/index/trie.go rename to exp/lighthorizon/index/types/trie.go diff --git a/exp/lighthorizon/index/trie_test.go b/exp/lighthorizon/index/types/trie_test.go similarity index 100% rename from exp/lighthorizon/index/trie_test.go rename to exp/lighthorizon/index/types/trie_test.go