Skip to content

Commit

Permalink
Try #5599:
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemesh-bors[bot] authored Mar 7, 2024
2 parents efea4b2 + d9658d2 commit f938a53
Show file tree
Hide file tree
Showing 19 changed files with 1,207 additions and 76 deletions.
7 changes: 7 additions & 0 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/atxsync"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
"github.com/spacemeshos/go-spacemesh/sql/localsql/nipost"
"github.com/spacemeshos/go-spacemesh/sql/poets"
Expand Down Expand Up @@ -115,6 +116,12 @@ func Recover(
return nil, fmt.Errorf("open old local database: %w", err)
}
defer localDB.Close()
logger.With().Info("clearing atx sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx *sql.Tx) error {
return atxsync.Clear(tx)
}); err != nil {
return nil, fmt.Errorf("clear atxsync: %w", err)
}
preserve, err := RecoverWithDb(ctx, logger, db, localDB, fs, cfg)
switch {
case errors.Is(err, ErrCheckpointNotFound):
Expand Down
2 changes: 2 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare3/eligibility"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config"
"github.com/spacemeshos/go-spacemesh/tortoise"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ func MainnetConfig() Config {
OutOfSyncThresholdLayers: 36, // 3h
DisableMeshAgreement: true,
DisableAtxReconciliation: true,
AtxSync: atxsync.DefaultConfig(),
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func fastnet() config.Config {
conf.LayerDuration = 15 * time.Second
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.Sync.AtxSync.EpochInfoInterval = 20 * time.Second
conf.LayersPerEpoch = 4
conf.RegossipAtxInterval = 30 * time.Second

Expand Down
4 changes: 3 additions & 1 deletion config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare3/eligibility"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config"
"github.com/spacemeshos/go-spacemesh/tortoise"
)
Expand Down Expand Up @@ -137,10 +138,11 @@ func testnet() config.Config {
LOGGING: config.DefaultLoggingConfig(),
Sync: syncer.Config{
Interval: time.Minute,
EpochEndFraction: 0.8,
EpochEndFraction: 0.5,
MaxStaleDuration: time.Hour,
GossipDuration: 50 * time.Second,
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
62 changes: 55 additions & 7 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"strings"
"sync"

"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/activation"
Expand Down Expand Up @@ -66,8 +68,11 @@ func (f *Fetch) getHashes(
pendingMetric := pendingHashReqs.WithLabelValues(string(hint))
pendingMetric.Add(float64(len(hashes)))

var eg errgroup.Group
var failed atomic.Uint64
var (
eg errgroup.Group
mu sync.Mutex
bfailure = BatchError{Errors: map[types.Hash32]error{}}
)
for i, hash := range hashes {
if err := options.limiter.Acquire(ctx, 1); err != nil {
pendingMetric.Add(float64(i - len(hashes)))
Expand Down Expand Up @@ -97,21 +102,24 @@ func (f *Fetch) getHashes(
options.limiter.Release(1)
pendingMetric.Add(-1)
if p.err != nil {
f.logger.Debug("failed to get hash",
f.logger.With().Debug("failed to get hash",
log.String("hint", string(hint)),
log.Stringer("hash", h),
log.Err(p.err),
)
failed.Add(1)

mu.Lock()
bfailure.Add(h, p.err)
mu.Unlock()
}
return nil
}
})
}

eg.Wait()
if failed.Load() > 0 {
return fmt.Errorf("failed to fetch %d hashes out of %d", failed.Load(), len(hashes))
if !bfailure.Empty() {
return &bfailure
}
return nil
}
Expand Down Expand Up @@ -313,3 +321,43 @@ func (f *Fetch) GetCert(
}
return nil, fmt.Errorf("failed to get cert %v/%s from %d peers: %w", lid, bid.String(), len(peers), ctx.Err())
}

type BatchError struct {
Errors map[types.Hash32]error
}

func (b *BatchError) Empty() bool {
return len(b.Errors) == 0
}

func (b *BatchError) Add(id types.Hash32, err error) {
if b.Errors == nil {
b.Errors = map[types.Hash32]error{}
}
b.Errors[id] = err
}

func (b *BatchError) Error() string {
var builder strings.Builder
builder.WriteString("batch failure: ")
for hash, err := range b.Errors {
builder.WriteString(hash.ShortString())
builder.WriteString("=")
builder.WriteString(err.Error())
}
return builder.String()
}

func (b *BatchError) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddArray("errors", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error {
for hash, err := range b.Errors {
encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(encoder log.ObjectEncoder) error {
encoder.AddString("id", hash.ShortString())
encoder.AddString("error", err.Error())
return nil
}))
}
return nil
}))
return nil
}
8 changes: 8 additions & 0 deletions log/zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ func ZContext(ctx context.Context) zap.Field {
return zap.Inline(&marshalledContext{Context: ctx})
}

func NiceZapError(err error) zap.Field {
var loggable ObjectMarshaller
if errors.As(err, &loggable) {
return zap.Inline(loggable)
}
return zap.Error(err)
}

func Any(key string, value any) Field {
return Field(zap.Any(key, value))
}
Expand Down
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,10 @@ func (app *App) initServices(ctx context.Context) error {
fetcher,
patrol,
app.certifier,
atxsync.New(fetcher, app.clock, app.db, app.localDB,
atxsync.WithConfig(app.Config.Sync.AtxSync),
atxsync.WithLogger(app.syncLogger.Zap()),
),
syncer.WithConfig(syncerConf),
syncer.WithLogger(app.syncLogger),
)
Expand Down
10 changes: 7 additions & 3 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ type DecayingTagSpec struct {
Cap int `mapstructure:"cap"`
}

// ErrNotConnected is returned when peer is not connected.
var ErrNotConnected = errors.New("peer is not connected")
var (
// ErrNotConnected is returned when peer is not connected.
ErrNotConnected = errors.New("peer is not connected")
// ErrPeerResponseFailed raised if peer responded with an error.
ErrPeerResponseFailed = errors.New("peer response failed")
)

// Opt is a type to configure a server.
type Opt func(s *Server)
Expand Down Expand Up @@ -343,7 +347,7 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte) ([]byte,
s.metrics.clientServerError.Inc()
s.metrics.clientLatency.Observe(took)
}
return nil, errors.New(data.Error)
return nil, fmt.Errorf("%w: %s", ErrPeerResponseFailed, data.Error)
case s.metrics != nil:
s.metrics.clientSucceeded.Inc()
s.metrics.clientLatency.Observe(took)
Expand Down
3 changes: 2 additions & 1 deletion p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestServer(t *testing.T) {
})
t.Run("ReceiveError", func(t *testing.T) {
_, err := client.Request(ctx, mesh.Hosts()[2].ID(), request)
require.Equal(t, err, testErr)
require.ErrorIs(t, err, ErrPeerResponseFailed)
require.ErrorContains(t, err, testErr.Error())
})
t.Run("DialError", func(t *testing.T) {
_, err := client.Request(ctx, mesh.Hosts()[2].ID(), request)
Expand Down
94 changes: 94 additions & 0 deletions sql/atxsync/atxsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package atxsync

import (
"fmt"
"time"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
)

func GetSyncState(db sql.Executor, epoch types.EpochID) (map[types.ATXID]int, error) {
states := map[types.ATXID]int{}
_, err := db.Exec("select id, requests from atx_sync_state where epoch = ?1",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
}, func(stmt *sql.Statement) bool {
var id types.ATXID
stmt.ColumnBytes(0, id[:])
states[id] = int(stmt.ColumnInt64(1))
return true
})
if err != nil {
return nil, fmt.Errorf("select synced atx ids for epoch failed %v: %w", epoch, err)
}
return states, nil
}

func SaveSyncState(db sql.Executor, epoch types.EpochID, states map[types.ATXID]int, max int) error {
for id, requests := range states {
var err error
if requests == max {
_, err = db.Exec(`delete from atx_sync_state where epoch = ?1 and id = ?2`, func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
stmt.BindBytes(2, id[:])
}, nil)
} else {
_, err = db.Exec(`insert into atx_sync_state
(epoch, id, requests) values (?1, ?2, ?3)
on conflict(epoch, id) do update set requests = ?3;`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
stmt.BindBytes(2, id[:])
stmt.BindInt64(3, int64(requests))
}, nil)
}
if err != nil {
return fmt.Errorf("insert synced atx id %v/%v failed: %w", epoch, id.ShortString(), err)
}
}
return nil
}

func SaveRequestTime(db sql.Executor, epoch types.EpochID, timestamp time.Time) error {
_, err := db.Exec(`insert into atx_sync_requests
(epoch, timestamp) values (?1, ?2)
on conflict(epoch) do update set timestamp = ?2;`,
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
stmt.BindInt64(2, timestamp.Unix())
}, nil)
if err != nil {
return fmt.Errorf("insert request time for epoch %v failed: %w", epoch, err)
}
return nil
}

func GetRequestTime(db sql.Executor, epoch types.EpochID) (time.Time, error) {
var timestamp time.Time
rows, err := db.Exec("select timestamp from atx_sync_requests where epoch = ?1",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
}, func(stmt *sql.Statement) bool {
timestamp = time.Unix(stmt.ColumnInt64(0), 0)
return true
})
if err != nil {
return time.Time{}, fmt.Errorf("select request time for epoch %v failed: %w", epoch, err)
} else if rows == 0 {
return time.Time{}, fmt.Errorf("%w: no request time for epoch %v", sql.ErrNotFound, epoch)
}
return timestamp, nil
}

func Clear(db sql.Executor) error {
_, err := db.Exec(`delete from atx_sync_state`, nil, nil)
if err != nil {
return fmt.Errorf("clear atx sync state failed: %w", err)
}
_, err = db.Exec(`delete from atx_sync_requests`, nil, nil)
if err != nil {
return fmt.Errorf("clear atx sync requests failed: %w", err)
}
return nil
}
Loading

0 comments on commit f938a53

Please sign in to comment.