diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index 01aac35317..5c96d071b0 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -5,245 +5,293 @@ import ( "fmt" "hash/fnv" "os" + "path/filepath" "strconv" "sync" - "github.com/aws/aws-sdk-go/aws" "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" ) -var ( - // Should we use runtime.NumCPU() for a reasonable default? - parallel = uint32(16) +const ( + ACCOUNT_FLUSH_FREQUENCY = 200 + // arbitrary default, should we use runtime.NumCPU()? + DEFAULT_WORKER_COUNT = 2 ) -func main() { - log.SetLevel(log.InfoLevel) - jobIndexString := os.Getenv("AWS_BATCH_JOB_ARRAY_INDEX") - if jobIndexString == "" { - panic("AWS_BATCH_JOB_ARRAY_INDEX env required") +type ReduceConfig struct { + JobIndex uint32 + MapJobCount uint32 + ReduceJobCount uint32 + IndexTarget string + IndexRootSource string + + Workers uint32 +} + +func ReduceConfigFromEnvironment() (*ReduceConfig, error) { + const ( + mapJobsEnv = "MAP_JOB_COUNT" + reduceJobsEnv = "REDUCE_JOB_COUNT" + workerCountEnv = "WORKER_COUNT" + jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX" + indexRootSourceEnv = "INDEX_SOURCE_ROOT" + indexTargetEnv = "INDEX_TARGET" + ) + + jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32) + if err != nil { + return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv) + } + mapJobCount, err := strconv.ParseUint(os.Getenv(mapJobsEnv), 10, 32) + if err != nil { + return nil, errors.Wrap(err, "invalid parameter "+mapJobsEnv) + } + reduceJobCount, err := strconv.ParseUint(os.Getenv(reduceJobsEnv), 10, 32) + if err != nil { + return nil, errors.Wrap(err, "invalid parameter "+reduceJobsEnv) } - mapJobsString := os.Getenv("MAP_JOBS") - if mapJobsString == "" { - panic("MAP_JOBS env required") + workersStr := os.Getenv(workerCountEnv) + if workersStr == "" { + workersStr = strconv.FormatUint(DEFAULT_WORKER_COUNT, 10) + } + workers, err := strconv.ParseUint(workersStr, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "invalid parameter "+workerCountEnv) } - reduceJobsString := os.Getenv("REDUCE_JOBS") - if mapJobsString == "" { - panic("REDUCE_JOBS env required") + indexTarget := os.Getenv(indexTargetEnv) + if indexTarget == "" { + return nil, errors.New("required parameter missing " + indexTargetEnv) } - jobIndex, err := strconv.ParseUint(jobIndexString, 10, 64) - if err != nil { - panic(err) + indexRootSource := os.Getenv(indexRootSourceEnv) + if indexRootSource == "" { + return nil, errors.New("required parameter missing " + indexRootSourceEnv) } - mapJobs, err := strconv.ParseUint(mapJobsString, 10, 64) + return &ReduceConfig{ + JobIndex: uint32(jobIndex), + MapJobCount: uint32(mapJobCount), + ReduceJobCount: uint32(reduceJobCount), + Workers: uint32(workers), + IndexTarget: indexTarget, + IndexRootSource: indexRootSource, + }, nil +} + +func main() { + log.SetLevel(log.InfoLevel) + + config, err := ReduceConfigFromEnvironment() if err != nil { panic(err) } - reduceJobs, err := strconv.ParseUint(reduceJobsString, 10, 64) + log.Infof("Connecting to %s", config.IndexTarget) + finalIndexStore, err := index.Connect(config.IndexTarget) if err != nil { - panic(err) + panic(errors.Wrapf(err, "failed to connect to indices at %s", + config.IndexTarget)) } - var ( - mutex sync.Mutex - doneAccounts map[string]struct{} = map[string]struct{}{} - ) - - indexStore, err := index.NewS3Store(&aws.Config{Region: aws.String("us-east-1")}, "", parallel) - if err != nil { - panic(err) + if err := mergeAllIndices(finalIndexStore, config); err != nil { + panic(errors.Wrap(err, "failed to merge indices")) } +} + +func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { + doneAccounts := NewSafeStringSet() + for i := uint32(0); i < config.MapJobCount; i++ { + logger := log.WithField("job", i) + + url := filepath.Join(config.IndexRootSource, "job_"+strconv.FormatUint(uint64(i), 10)) + logger.Infof("Connecting to %s", url) - for i := uint64(0); i < mapJobs; i++ { - outerJobStore, err := index.NewS3Store( - &aws.Config{Region: aws.String("us-east-1")}, - fmt.Sprintf("job_%d", i), - parallel, - ) + outerJobStore, err := index.Connect(url) if err != nil { - panic(err) + return errors.Wrapf(err, "failed to connect to indices at %s", url) } accounts, err := outerJobStore.ReadAccounts() - if err != nil { - // TODO: in final version this should be critical error, now just skip it - if err == os.ErrNotExist { - log.Errorf("Job %d is unavailable - TODO fix", i) - continue - } - panic(err) + // TODO: in final version this should be critical error, now just skip it + if os.IsNotExist(err) { + logger.Errorf("accounts file not found (TODO!)") + continue + } else if err != nil { + return errors.Wrapf(err, "failed to read accounts for job %d", i) } - log.Info("Outer job ", i, " accounts ", len(accounts)) - - ch := make(chan string, parallel) - go func() { - for _, account := range accounts { - mutex.Lock() - _, ok := doneAccounts[account] - mutex.Unlock() - if ok { - // Account index already merged in the previous outer job - continue - } - ch <- account - } - close(ch) - }() + logger.Infof("Processing %d accounts with %d workers", + len(accounts), config.Workers) - var wg sync.WaitGroup - wg.Add(int(parallel)) - for j := uint32(0); j < parallel; j++ { - go func(routine uint32) { - defer wg.Done() - var skipped, processed uint64 - for account := range ch { - if (processed+skipped)%1000 == 0 { - log.Infof( - "outer: %d, routine: %d, processed: %d, skipped: %d, all account in outer job: %d\n", - i, routine, processed, skipped, len(accounts), - ) - } + workQueues := make([]chan string, config.Workers) + for i := range workQueues { + workQueues[i] = make(chan string, 1) + } - hash := fnv.New64a() - _, err = hash.Write([]byte(account)) - if err != nil { - panic(err) + for idx, queue := range workQueues { + go (func(index uint32, queue chan string) { + for _, account := range accounts { + // Account index already merged in the previous outer job? + if doneAccounts.Contains(account) { + continue } - hashSum := hash.Sum64() - hashLeft := uint32(hashSum >> 4) - hashRight := uint32(0x0000ffff & hashSum) - - if hashRight%uint32(reduceJobs) != uint32(jobIndex) { - // This job is not merging this account - skipped++ + // Account doesn't belong in this work queue? + if !config.shouldProcessAccount(account, index) { continue } - if hashLeft%uint32(parallel) != uint32(routine) { - // This go routine is not merging this account - skipped++ - continue + queue <- account + } + + close(queue) + })(uint32(idx), queue) + } + + // TODO: errgroup.WithContext(ctx) + var wg sync.WaitGroup + wg.Add(int(config.Workers)) + for j := uint32(0); j < config.Workers; j++ { + go func(routineIndex uint32) { + defer wg.Done() + logger := logger. + WithField("worker", routineIndex). + WithField("total", len(accounts)) + logger.Info("Started worker") + + var accountsProcessed, accountsSkipped uint64 + for account := range workQueues[routineIndex] { + logger.Infof("Account: %s", account) + if (accountsProcessed+accountsSkipped)%97 == 0 { + logger. + WithField("indexed", accountsProcessed). + WithField("skipped", accountsSkipped). + Infof("Processed %d/%d accounts", + accountsProcessed+accountsSkipped, len(accounts)) } - outerAccountIndexes, err := outerJobStore.Read(account) - if err != nil { - // TODO: in final version this should be critical error, now just skip it - if err == os.ErrNotExist { - log.Errorf("Account %s is unavailable - TODO fix", account) - continue - } + logger.Infof("Reading index for account: %s", account) + + // First, open the "final merged indices" at the root level + // for this account. + mergedIndices, err := outerJobStore.Read(account) + + // TODO: in final version this should be critical error, now just skip it + if os.IsNotExist(err) { + logger.Errorf("Account %s is unavailable - TODO fix", account) + continue + } else if err != nil { panic(err) } - for k := uint64(i + 1); k < mapJobs; k++ { - innerJobStore, err := index.NewS3Store( - &aws.Config{Region: aws.String("us-east-1")}, - fmt.Sprintf("job_%d", k), - parallel, - ) + // Then, iterate through all of the job folders and merge + // indices from all jobs that touched this account. + for k := uint32(0); k < config.MapJobCount; k++ { + url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) + + // FIXME: This could probably come from a pool. Every + // worker needs to have a connection to every index + // store, so there's no reason to re-open these for each + // inner loop. + innerJobStore, err := index.Connect(url) if err != nil { + logger.WithError(err). + Errorf("Failed to open index at %s", url) panic(err) } - innerAccountIndexes, err := innerJobStore.Read(account) - if err != nil { - if err == os.ErrNotExist { - continue - } + jobIndices, err := innerJobStore.Read(account) + // This job never touched this account; skip. + if os.IsNotExist(err) { + continue + } else if err != nil { + logger.WithError(err). + Errorf("Failed to read index for %s", account) panic(err) } - for name, index := range outerAccountIndexes { - if innerAccountIndexes[name] == nil { - continue - } - err := index.Merge(innerAccountIndexes[name]) - if err != nil { - panic(err) - } + if err := mergeIndices(mergedIndices, jobIndices); err != nil { + logger.WithError(err). + Errorf("Merge failure for index at %s", url) + panic(err) } } - // Save merged index - indexStore.AddParticipantToIndexesNoBackend(account, outerAccountIndexes) + // Finally, save the merged index. + finalIndexStore.AddParticipantToIndexesNoBackend(account, mergedIndices) - // Mark account as done - mutex.Lock() - doneAccounts[account] = struct{}{} - mutex.Unlock() - processed++ + // Mark this account for other workers to ignore. + doneAccounts.Add(account) + accountsProcessed++ + logger = logger.WithField("processed", accountsProcessed) - if processed%200 == 0 { - log.Infof("Flushing %d, processed %d", routine, processed) - err = indexStore.Flush() - if err != nil { + // Periodically flush to disk to save memory. + if accountsProcessed%ACCOUNT_FLUSH_FREQUENCY == 0 { + logger.Infof("Flushing indexed accounts.") + if err = finalIndexStore.Flush(); err != nil { + logger.WithError(err).Errorf("Flush error.") panic(err) } } } - log.Infof("Flushing Accounts %d, processed %d", routine, processed) - err = indexStore.Flush() - if err != nil { + logger.Infof("Final account flush.") + if err = finalIndexStore.Flush(); err != nil { + logger.WithError(err).Errorf("Flush error.") panic(err) } // Merge the transaction indexes // There's 256 files, (one for each first byte of the txn hash) - processed = 0 + var transactionsProcessed, transactionsSkipped uint64 + logger = logger. + WithField("indexed", transactionsProcessed). + WithField("skipped", transactionsSkipped) + for i := byte(0x00); i < 0xff; i++ { - hashLeft := uint32(i >> 4) - hashRight := uint32(0x0f & i) - if hashRight%uint32(reduceJobs) != uint32(jobIndex) { - // This job is not merging this prefix - skipped++ - continue + if i%97 == 0 { + logger.Infof("%d transactions processed (%d skipped)", + transactionsProcessed, transactionsSkipped) } - if hashLeft%uint32(parallel) != uint32(routine) { - // This go routine is not merging this prefix - skipped++ + if !config.shouldProcessTx(i, routineIndex) { + transactionsSkipped++ continue } - processed++ + transactionsProcessed++ prefix := hex.EncodeToString([]byte{i}) - for k := uint64(0); k < mapJobs; k++ { - innerJobStore, err := index.NewS3Store( - &aws.Config{Region: aws.String("us-east-1")}, - fmt.Sprintf("job_%d", k), - parallel, - ) + for k := uint32(0); k < config.MapJobCount; k++ { + url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) + innerJobStore, err := index.Connect(url) if err != nil { + logger.WithError(err).Errorf("Failed to open index at %s", url) panic(err) } innerTxnIndexes, err := innerJobStore.ReadTransactions(prefix) - if err != nil { - if err == os.ErrNotExist { - continue - } + if os.IsNotExist(err) { + continue + } else if err != nil { + logger.WithError(err).Error("Error reading tx prefix %s", prefix) panic(err) } - if err := indexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil { + if err := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil { + logger.WithError(err).Errorf("Error merging txs at prefix %s", prefix) panic(err) } } } - log.Infof("Flushing Transactions %d, processed %d", routine, processed) - err = indexStore.Flush() - if err != nil { + logger.Infof("Final transaction flush (%d processed)", transactionsProcessed) + if err = finalIndexStore.Flush(); err != nil { + logger.Errorf("Error flushing transactions: %v", err) panic(err) } }(j) @@ -251,4 +299,60 @@ func main() { wg.Wait() } + + return nil +} + +func (cfg *ReduceConfig) shouldProcessAccount(account string, routineIndex uint32) bool { + hash := fnv.New64a() + + // Docs state (https://pkg.go.dev/hash#Hash) that Write will never error. + hash.Write([]byte(account)) + digest := uint32(hash.Sum64()) // discard top 32 bits + + leftHalf := digest >> 16 + rightHalf := digest & 0x0000FFFF + + log.WithField("worker", routineIndex). + WithField("account", account). + Debugf("Hash: %d (left=%d, right=%d)", digest, leftHalf, rightHalf) + + // Because the digest is basically a random number (given a good hash + // function), its remainders w.r.t. the indices will distribute the work + // fairly (and deterministically). + return leftHalf%cfg.ReduceJobCount == cfg.JobIndex && + rightHalf%cfg.Workers == routineIndex +} + +func (cfg *ReduceConfig) shouldProcessTx(txPrefix byte, routineIndex uint32) bool { + hashLeft := uint32(txPrefix >> 4) + hashRight := uint32(txPrefix & 0x0F) + + // Because the transaction hash (and thus the first byte or "prefix") is a + // random value, its remainders w.r.t. the indices will distribute the work + // fairly (and deterministically). + return hashRight%cfg.ReduceJobCount == cfg.JobIndex && + hashLeft%cfg.Workers == routineIndex +} + +// For every index that exists in `dest`, finds the corresponding index in +// `source` and merges it into `dest`'s version. +func mergeIndices(dest, source map[string]*index.CheckpointIndex) error { + for name, index := range dest { + // The source doesn't contain this particular index. + // + // This probably shouldn't happen, since during the Map step, there's no + // way to choose which indices you want, but, strictly-speaking, it's + // not an error, so we can just move on. + innerIndices, ok := source[name] + if !ok || innerIndices == nil { + continue + } + + if err := index.Merge(innerIndices); err != nil { + return errors.Wrapf(err, "failed to merge index for %s", name) + } + } + + return nil } diff --git a/exp/lighthorizon/index/cmd/batch/reduce/set.go b/exp/lighthorizon/index/cmd/batch/reduce/set.go new file mode 100644 index 0000000000..e57b88775b --- /dev/null +++ b/exp/lighthorizon/index/cmd/batch/reduce/set.go @@ -0,0 +1,29 @@ +package main + +import "sync" + +// SafeStringSet is a simple thread-safe set. +type SafeStringSet struct { + lock sync.RWMutex + set map[string]struct{} +} + +func NewSafeStringSet() *SafeStringSet { + return &SafeStringSet{ + lock: sync.RWMutex{}, + set: map[string]struct{}{}, + } +} + +func (set *SafeStringSet) Contains(key string) bool { + defer set.lock.RUnlock() + set.lock.RLock() + _, ok := set.set[key] + return ok +} + +func (set *SafeStringSet) Add(key string) { + defer set.lock.Unlock() + set.lock.Lock() + set.set[key] = struct{}{} +} diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index 7e5b522539..4620046f29 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -1,7 +1,7 @@ package index import ( - "errors" + "fmt" "net/url" "path/filepath" @@ -20,12 +20,14 @@ func Connect(backendUrl string) (Store, error) { if region := query.Get("region"); region != "" { config.Region = aws.String(region) } + return NewS3Store(config, parsed.Path, 20) case "file": return NewFileStore(filepath.Join(parsed.Host, parsed.Path), 20) default: - return nil, errors.New("unknown URL scheme: '" + parsed.Scheme + "'") + return nil, fmt.Errorf("unknown URL scheme: '%s' (from %s)", + parsed.Scheme, backendUrl) } } diff --git a/exp/lighthorizon/index/store.go b/exp/lighthorizon/index/store.go index 05f19a803b..640af10f76 100644 --- a/exp/lighthorizon/index/store.go +++ b/exp/lighthorizon/index/store.go @@ -60,12 +60,7 @@ func (s *store) accounts() []string { func (s *store) FlushAccounts() error { s.mutex.Lock() defer s.mutex.Unlock() - - if err := s.backend.FlushAccounts(s.accounts()); err != nil { - return err - } - - return nil + return s.backend.FlushAccounts(s.accounts()) } func (s *store) Read(account string) (map[string]*CheckpointIndex, error) { @@ -88,6 +83,9 @@ func (s *store) MergeTransactions(prefix string, other *TrieIndex) error { if err := index.Merge(other); err != nil { return err } + + s.mutex.Lock() + defer s.mutex.Unlock() s.txIndexes[prefix] = index return nil } @@ -100,6 +98,10 @@ func (s *store) Flush() error { return err } + if err := s.backend.FlushAccounts(s.accounts()); err != nil { + return err + } + // clear indexes to save memory s.indexes = map[string]map[string]*CheckpointIndex{} @@ -159,6 +161,7 @@ func (s *store) AddParticipantsToIndexesNoBackend(checkpoint uint32, index strin return err } } + return nil }