Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - atx syncer that persists results #5599

Closed
wants to merge 11 commits into from
7 changes: 7 additions & 0 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/atxsync"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
"github.com/spacemeshos/go-spacemesh/sql/localsql/nipost"
"github.com/spacemeshos/go-spacemesh/sql/poets"
Expand Down Expand Up @@ -115,6 +116,12 @@
return nil, fmt.Errorf("open old local database: %w", err)
}
defer localDB.Close()
logger.With().Info("clearing atx sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx *sql.Tx) error {
return atxsync.Clear(tx)
}); err != nil {
return nil, fmt.Errorf("clear atxsync: %w", err)

Check warning on line 123 in checkpoint/recovery.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/recovery.go#L123

Added line #L123 was not covered by tests
}
preserve, err := RecoverWithDb(ctx, logger, db, localDB, fs, cfg)
switch {
case errors.Is(err, ErrCheckpointNotFound):
Expand Down
2 changes: 2 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare3/eligibility"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config"
"github.com/spacemeshos/go-spacemesh/tortoise"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ func MainnetConfig() Config {
OutOfSyncThresholdLayers: 36, // 3h
DisableMeshAgreement: true,
DisableAtxReconciliation: true,
AtxSync: atxsync.DefaultConfig(),
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func fastnet() config.Config {
conf.LayerDuration = 15 * time.Second
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.Sync.AtxSync.EpochInfoInterval = 20 * time.Second
conf.LayersPerEpoch = 4
conf.RegossipAtxInterval = 30 * time.Second

Expand Down
4 changes: 3 additions & 1 deletion config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare3/eligibility"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config"
"github.com/spacemeshos/go-spacemesh/tortoise"
)
Expand Down Expand Up @@ -137,10 +138,11 @@ func testnet() config.Config {
LOGGING: config.DefaultLoggingConfig(),
Sync: syncer.Config{
Interval: time.Minute,
EpochEndFraction: 0.8,
EpochEndFraction: 0.5,
MaxStaleDuration: time.Hour,
GossipDuration: 50 * time.Second,
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
6 changes: 3 additions & 3 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ const (
)

var (
// errExceedMaxRetries is returned when MaxRetriesForRequest attempts has been made to fetch
// ErrExceedMaxRetries is returned when MaxRetriesForRequest attempts has been made to fetch
// data for a hash and failed.
errExceedMaxRetries = errors.New("fetch failed after max retries for request")
ErrExceedMaxRetries = errors.New("fetch failed after max retries for request")

errValidatorsNotSet = errors.New("validators not set")
)
Expand Down Expand Up @@ -534,7 +534,7 @@ func (f *Fetch) failAfterRetry(hash types.Hash32) {
log.Stringer("hash", req.hash),
log.Int("retries", req.retries),
)
req.promise.err = errExceedMaxRetries
req.promise.err = ErrExceedMaxRetries
close(req.promise.completed)
} else {
// put the request back to the unprocessed list
Expand Down
47 changes: 40 additions & 7 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"context"
"errors"
"fmt"
"sync/atomic"
"strings"
"sync"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -66,8 +67,11 @@
pendingMetric := pendingHashReqs.WithLabelValues(string(hint))
pendingMetric.Add(float64(len(hashes)))

var eg errgroup.Group
var failed atomic.Uint64
var (
eg errgroup.Group
mu sync.Mutex
bfailure = BatchError{Errors: map[types.Hash32]error{}}
)
for i, hash := range hashes {
if err := options.limiter.Acquire(ctx, 1); err != nil {
pendingMetric.Add(float64(i - len(hashes)))
Expand Down Expand Up @@ -97,21 +101,24 @@
options.limiter.Release(1)
pendingMetric.Add(-1)
if p.err != nil {
f.logger.Debug("failed to get hash",
f.logger.With().Debug("failed to get hash",
log.String("hint", string(hint)),
log.Stringer("hash", h),
log.Err(p.err),
)
failed.Add(1)

mu.Lock()
bfailure.Add(h, p.err)
mu.Unlock()
}
return nil
}
})
}

eg.Wait()
if failed.Load() > 0 {
return fmt.Errorf("failed to fetch %d hashes out of %d", failed.Load(), len(hashes))
if !bfailure.Empty() {
return &bfailure
}
return nil
}
Expand Down Expand Up @@ -313,3 +320,29 @@
}
return nil, fmt.Errorf("failed to get cert %v/%s from %d peers: %w", lid, bid.String(), len(peers), ctx.Err())
}

type BatchError struct {
Errors map[types.Hash32]error
}

func (b *BatchError) Empty() bool {
return len(b.Errors) == 0
}

func (b *BatchError) Add(id types.Hash32, err error) {
if b.Errors == nil {
b.Errors = map[types.Hash32]error{}
}
b.Errors[id] = err
}

func (b *BatchError) Error() string {
var builder strings.Builder
builder.WriteString("batch failure: ")
for hash, err := range b.Errors {
builder.WriteString(hash.ShortString())
builder.WriteString("=")
builder.WriteString(err.Error())

Check warning on line 345 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L339-L345

Added lines #L339 - L345 were not covered by tests
}
return builder.String()

Check warning on line 347 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L347

Added line #L347 was not covered by tests
}
8 changes: 8 additions & 0 deletions log/zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@
return zap.Inline(&marshalledContext{Context: ctx})
}

func NiceZapError(err error) zap.Field {
var loggable ObjectMarshaller
if errors.As(err, &loggable) {
return zap.Inline(loggable)

Check warning on line 204 in log/zap.go

View check run for this annotation

Codecov / codecov/patch

log/zap.go#L204

Added line #L204 was not covered by tests
}
return zap.Error(err)
}

func Any(key string, value any) Field {
return Field(zap.Any(key, value))
}
Expand Down
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,10 @@ func (app *App) initServices(ctx context.Context) error {
fetcher,
patrol,
app.certifier,
atxsync.New(fetcher, app.clock, app.db, app.localDB,
atxsync.WithConfig(app.Config.Sync.AtxSync),
atxsync.WithLogger(app.syncLogger.Zap()),
),
syncer.WithConfig(syncerConf),
syncer.WithLogger(app.syncLogger),
)
Expand Down
2 changes: 1 addition & 1 deletion p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte) ([]byte,
s.metrics.clientServerError.Inc()
s.metrics.clientLatency.Observe(took)
}
return nil, errors.New(data.Error)
return nil, fmt.Errorf("peer error: %s", data.Error)
case s.metrics != nil:
s.metrics.clientSucceeded.Inc()
s.metrics.clientLatency.Observe(took)
Expand Down
3 changes: 2 additions & 1 deletion p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestServer(t *testing.T) {
})
t.Run("ReceiveError", func(t *testing.T) {
_, err := client.Request(ctx, mesh.Hosts()[2].ID(), request)
require.Equal(t, err, testErr)
require.ErrorContains(t, err, "peer error")
require.ErrorContains(t, err, testErr.Error())
})
t.Run("DialError", func(t *testing.T) {
_, err := client.Request(ctx, mesh.Hosts()[2].ID(), request)
Expand Down
94 changes: 94 additions & 0 deletions sql/atxsync/atxsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package atxsync

import (
"fmt"
"time"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
)

func GetSyncState(db sql.Executor, epoch types.EpochID) (map[types.ATXID]int, error) {
states := map[types.ATXID]int{}
_, err := db.Exec("select id, requests from atx_sync_state where epoch = ?1",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
}, func(stmt *sql.Statement) bool {
var id types.ATXID
stmt.ColumnBytes(0, id[:])
states[id] = int(stmt.ColumnInt64(1))
return true
})
if err != nil {
return nil, fmt.Errorf("select synced atx ids for epoch failed %v: %w", epoch, err)

Check warning on line 23 in sql/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

sql/atxsync/atxsync.go#L23

Added line #L23 was not covered by tests
}
return states, nil
}

func SaveSyncState(db sql.Executor, epoch types.EpochID, states map[types.ATXID]int, max int) error {
for id, requests := range states {
var err error
if requests == max {
_, err = db.Exec(`delete from atx_sync_state where epoch = ?1 and id = ?2`, func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
stmt.BindBytes(2, id[:])
}, nil)
} else {
_, err = db.Exec(`insert into atx_sync_state
(epoch, id, requests) values (?1, ?2, ?3)
on conflict(epoch, id) do update set requests = ?3;`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
stmt.BindBytes(2, id[:])
stmt.BindInt64(3, int64(requests))
}, nil)
}
if err != nil {
return fmt.Errorf("insert synced atx id %v/%v failed: %w", epoch, id.ShortString(), err)

Check warning on line 47 in sql/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

sql/atxsync/atxsync.go#L47

Added line #L47 was not covered by tests
}
}
return nil
}

func SaveRequestTime(db sql.Executor, epoch types.EpochID, timestamp time.Time) error {
_, err := db.Exec(`insert into atx_sync_requests
(epoch, timestamp) values (?1, ?2)
on conflict(epoch) do update set timestamp = ?2;`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
stmt.BindInt64(2, timestamp.Unix())
}, nil)
if err != nil {
return fmt.Errorf("insert request time for epoch %v failed: %w", epoch, err)

Check warning on line 62 in sql/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

sql/atxsync/atxsync.go#L62

Added line #L62 was not covered by tests
}
return nil
}

func GetRequestTime(db sql.Executor, epoch types.EpochID) (time.Time, error) {
var timestamp time.Time
rows, err := db.Exec("select timestamp from atx_sync_requests where epoch = ?1",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
}, func(stmt *sql.Statement) bool {
timestamp = time.Unix(stmt.ColumnInt64(0), 0)
return true
})
if err != nil {
return time.Time{}, fmt.Errorf("select request time for epoch %v failed: %w", epoch, err)

Check warning on line 77 in sql/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

sql/atxsync/atxsync.go#L77

Added line #L77 was not covered by tests
} else if rows == 0 {
return time.Time{}, fmt.Errorf("%w: no request time for epoch %v", sql.ErrNotFound, epoch)
}
return timestamp, nil
}

func Clear(db sql.Executor) error {
_, err := db.Exec(`delete from atx_sync_state`, nil, nil)
if err != nil {
return fmt.Errorf("clear atx sync state failed: %w", err)

Check warning on line 87 in sql/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

sql/atxsync/atxsync.go#L87

Added line #L87 was not covered by tests
}
_, err = db.Exec(`delete from atx_sync_requests`, nil, nil)
if err != nil {
return fmt.Errorf("clear atx sync requests failed: %w", err)

Check warning on line 91 in sql/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

sql/atxsync/atxsync.go#L91

Added line #L91 was not covered by tests
}
return nil
}
Loading