Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Restructure index package into sensible sub-packages #4427

Merged
merged 5 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions exp/lighthorizon/index/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package index

import types "github.com/stellar/go/exp/lighthorizon/index/types"

// TODO: Use a more standardized filesystem-style backend, so we can re-use
// code
type Backend interface {
Flush(map[string]types.NamedIndices) error
FlushAccounts([]string) error
Read(account string) (types.NamedIndices, error)
ReadAccounts() ([]string, error)
FlushTransactions(map[string]*types.TrieIndex) error
ReadTransactions(prefix string) (*types.TrieIndex, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)
Expand All @@ -17,22 +18,14 @@ type FileBackend struct {
parallel uint32
}

func NewFileStore(dir string, parallel uint32) (Store, error) {
backend, err := NewFileBackend(dir, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}

func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) {
return &FileBackend{
dir: dir,
parallel: parallel,
}, nil
}

func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) error {
func (s *FileBackend) Flush(indexes map[string]types.NamedIndices) error {
return parallelFlush(s.parallel, indexes, s.writeBatch)
}

Expand Down Expand Up @@ -88,7 +81,7 @@ func (s *FileBackend) writeBatch(b *batch) error {
return nil
}

func (s *FileBackend) FlushTransactions(indexes map[string]*TrieIndex) error {
func (s *FileBackend) FlushTransactions(indexes map[string]*types.TrieIndex) error {
// TODO: Parallelize this
for key, index := range indexes {
path := filepath.Join(s.dir, "tx", key)
Expand Down Expand Up @@ -125,7 +118,7 @@ func (s *FileBackend) FlushTransactions(indexes map[string]*TrieIndex) error {
return nil
}

func (s *FileBackend) Read(account string) (map[string]*CheckpointIndex, error) {
func (s *FileBackend) Read(account string) (types.NamedIndices, error) {
log.Debugf("Opening index: %s", account)
b, err := os.Open(filepath.Join(s.dir, account[:3], account))
if err != nil {
Expand Down Expand Up @@ -185,7 +178,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) {
return accounts, nil
}

func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) {
func (s *FileBackend) ReadTransactions(prefix string) (*types.TrieIndex, error) {
log.Debugf("Opening index: %s", prefix)
b, err := os.Open(filepath.Join(s.dir, "tx", prefix))
if err != nil {
Expand All @@ -198,7 +191,7 @@ func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) {
return nil, os.ErrNotExist
}
defer zr.Close()
var index TrieIndex
var index types.TrieIndex
_, err = index.ReadFrom(zr)
if err != nil {
log.Errorf("Unable to parse %s: %v", prefix, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"compress/gzip"
"io"

types "github.com/stellar/go/exp/lighthorizon/index/types"
)

func writeGzippedTo(w io.Writer, indexes map[string]*CheckpointIndex) (int64, error) {
func writeGzippedTo(w io.Writer, indexes types.NamedIndices) (int64, error) {
zw := gzip.NewWriter(w)

var n int64
Expand All @@ -28,12 +30,12 @@ func writeGzippedTo(w io.Writer, indexes map[string]*CheckpointIndex) (int64, er
return n, nil
}

func readGzippedFrom(r io.Reader) (map[string]*CheckpointIndex, int64, error) {
func readGzippedFrom(r io.Reader) (types.NamedIndices, int64, error) {
zr, err := gzip.NewReader(r)
if err != nil {
return nil, 0, err
}
indexes := map[string]*CheckpointIndex{}
indexes := types.NamedIndices{}
var buf bytes.Buffer
var n int64
for {
Expand All @@ -45,7 +47,7 @@ func readGzippedFrom(r io.Reader) (map[string]*CheckpointIndex, int64, error) {
return nil, n, err
}

ind, err := NewCheckpointIndexFromBytes(buf.Bytes())
ind, err := types.NewCheckpointIndexFromBytes(buf.Bytes())
if err != nil {
return nil, n, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"bytes"
"testing"

types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stretchr/testify/require"
)

func TestGzipWriteReadRoundtrip(t *testing.T) {
indexes := map[string]*CheckpointIndex{}
index := &CheckpointIndex{}
indexes := types.NamedIndices{}
index := &types.CheckpointIndex{}
index.SetActive(5)
indexes["A"] = index

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import (
"sync/atomic"
"time"

types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/log"
)

type batch struct {
account string
indexes map[string]*CheckpointIndex
indexes types.NamedIndices
}

type flushBatch func(b *batch) error

func parallelFlush(parallel uint32, allIndexes map[string]map[string]*CheckpointIndex, f flushBatch) error {
func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f flushBatch) error {
var wg sync.WaitGroup

batches := make(chan *batch, parallel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"

types "github.com/stellar/go/exp/lighthorizon/index/types"
)

const BUCKET = "horizon-index"
Expand All @@ -27,14 +29,6 @@ type S3Backend struct {
prefix string
}

func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) {
backend, err := NewS3Backend(awsConfig, prefix, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}

func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Backend, error) {
s3Session, err := session.NewSession(awsConfig)
if err != nil {
Expand Down Expand Up @@ -72,7 +66,7 @@ func (s *S3Backend) FlushAccounts(accounts []string) error {
return nil
}

func (s *S3Backend) Flush(indexes map[string]map[string]*CheckpointIndex) error {
func (s *S3Backend) Flush(indexes map[string]types.NamedIndices) error {
return parallelFlush(s.parallel, indexes, s.writeBatch)
}

Expand All @@ -98,7 +92,7 @@ func (s *S3Backend) writeBatch(b *batch) error {
return nil
}

func (s *S3Backend) FlushTransactions(indexes map[string]*TrieIndex) error {
func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error {
// TODO: Parallelize this
var buf bytes.Buffer
for key, index := range indexes {
Expand Down Expand Up @@ -156,7 +150,7 @@ func (s *S3Backend) path(account string) string {
return filepath.Join(s.prefix, account[:10], account)
}

func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) {
func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
// Check if index exists in S3
log.Debugf("Downloading index: %s", account)
var err error
Expand All @@ -179,7 +173,7 @@ func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) {
if n == 0 {
return nil, os.ErrNotExist
}
var indexes map[string]*CheckpointIndex
var indexes map[string]*types.CheckpointIndex
indexes, _, err = readGzippedFrom(bytes.NewReader(b.Bytes()))
if err != nil {
log.Errorf("Unable to parse %s: %v", account, err)
Expand All @@ -191,7 +185,7 @@ func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) {
return nil, err
}

func (s *S3Backend) ReadTransactions(prefix string) (*TrieIndex, error) {
func (s *S3Backend) ReadTransactions(prefix string) (*types.TrieIndex, error) {
// Check if index exists in S3
log.Debugf("Downloading index: %s", prefix)
b := &aws.WriteAtBuffer{}
Expand All @@ -216,7 +210,7 @@ func (s *S3Backend) ReadTransactions(prefix string) (*TrieIndex, error) {
}
defer zr.Close()

var index TrieIndex
var index types.TrieIndex
_, err = index.ReadFrom(zr)
if err != nil {
log.Errorf("Unable to parse %s: %v", prefix, err)
Expand Down
3 changes: 2 additions & 1 deletion exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/stellar/go/exp/lighthorizon/index"
types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -337,7 +338,7 @@ func (cfg *ReduceConfig) shouldProcessTx(txPrefix byte, routineIndex uint32) boo

// For every index that exists in `dest`, finds the corresponding index in
// `source` and merges it into `dest`'s version.
func mergeIndices(dest, source map[string]*index.CheckpointIndex) error {
func mergeIndices(dest, source map[string]*types.CheckpointIndex) error {
for name, index := range dest {
// The source doesn't contain this particular index.
//
Expand Down
18 changes: 18 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"path/filepath"

"github.com/aws/aws-sdk-go/aws"

backend "github.com/stellar/go/exp/lighthorizon/index/backend"
)

func Connect(backendUrl string) (Store, error) {
Expand All @@ -31,3 +33,19 @@ func Connect(backendUrl string) (Store, error) {
parsed.Scheme, backendUrl)
}
}

func NewFileStore(dir string, parallel uint32) (Store, error) {
backend, err := backend.NewFileBackend(dir, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}

func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, prefix, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}
Loading